I have 400k of JSON files. Which is between 10 kb and 500 kb in size.
They don`t have the same schema, so I have to loop over them one at a time.

This works, but is`s very slow. This process takes 5 days!

So now I have tried to run this functions in a ThreadPool. But it don`t
seems to work.


*Start local spark. The system have 16 cores and 64 GB.*

number_cores = int(multiprocessing.cpu_count())

mem_bytes = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')  #
e.g. 4015976448
memory_gb = int(mem_bytes/(1024.**3))  # e.g. 3.74


def get_spark_session(app_name: str, conf: SparkConf):
    conf.setMaster('local[{}]'.format(number_cores))
    conf \
      .set('spark.driver.memory', '{}g'.format(memory_gb)) \
      .set("spark.sql.repl.eagerEval.enabled", "True") \
      .set("spark.sql.adaptive.enabled", "True") \
      .set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer") \
      .set("spark.sql.repl.eagerEval.maxNumRows", "10000")

    return
SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()

spark = get_spark_session("Falk", SparkConf())


*Function to rename columns with \\ *

# We take a dataframe and return a new one with required changes
def cleanDataFrame(df: DataFrame) -> DataFrame:
    # Returns a new sanitized field name (this function can be anything
really)
    def sanitizeFieldName(s: str) -> str:
        return s.replace("-", "_").replace("&", "_").replace("\"", "_")\
            .replace("[", "_").replace("]", "_").replace(".", "_")

    # We call this on all fields to create a copy and to perform any
changes we might
    # want to do to the field.
    def sanitizeField(field: StructField) -> StructField:
        field = copy(field)
        field.name = sanitizeFieldName(field.name)
        # We recursively call cleanSchema on all types
        field.dataType = cleanSchema(field.dataType)
        return field

    def cleanSchema(dataType: [DataType]) -> [DateType]:
        dataType = copy(dataType)
        # If the type is a StructType we need to recurse otherwise we can
return since
        # we've reached the leaf node
        if isinstance(dataType, StructType):
            # We call our sanitizer for all top level fields
            dataType.fields = [sanitizeField(f) for f in dataType.fields]
        elif isinstance(dataType, ArrayType):
            dataType.elementType = cleanSchema(dataType.elementType)
        return dataType

    # Now since we have the new schema we can create a new DataFrame by
using the old Frame's RDD as data and the new schema as the schema for the
data
    return spark.createDataFrame(df.rdd, cleanSchema(df.schema))



*Function to flatten out a nested dataframe.*


from pyspark.sql.types import *
from pyspark.sql.functions import *


def flatten_test(df, sep="_"):
    """Returns a flattened dataframe.
        .. versionadded:: x.X.X

        Parameters
        ----------
        sep : str
            Delimiter for flatted columns. Default `_`

        Notes
        -----
        Don`t use `.` as `sep`
        It won't work on nested data frames with more than one level.
        And you will have to use `columns.name`.

        Flattening Map Types will have to find every key in the column.
        This can be slow.

        Examples
        --------

        data_mixed = [
            {
                "state": "Florida",
                "shortname": "FL",
                "info": {"governor": "Rick Scott"},
                "counties": [
                    {"name": "Dade", "population": 12345},
                    {"name": "Broward", "population": 40000},
                    {"name": "Palm Beach", "population": 60000},
                ],
            },
            {
                "state": "Ohio",
                "shortname": "OH",
                "info": {"governor": "John Kasich"},
                "counties": [
                    {"name": "Summit", "population": 1234},
                    {"name": "Cuyahoga", "population": 1337},
                ],
            },
        ]

        data_mixed = spark.createDataFrame(data=data_mixed)

        data_mixed.printSchema()

        root
        |-- counties: array (nullable = true)
        |    |-- element: map (containsNull = true)
        |    |    |-- key: string
        |    |    |-- value: string (valueContainsNull = true)
        |-- info: map (nullable = true)
        |    |-- key: string
        |    |-- value: string (valueContainsNull = true)
        |-- shortname: string (nullable = true)
        |-- state: string (nullable = true)


        data_mixed_flat = flatten_test(df, sep=":")
        data_mixed_flat.printSchema()
        root
        |-- shortname: string (nullable = true)
        |-- state: string (nullable = true)
        |-- counties:name: string (nullable = true)
        |-- counties:population: string (nullable = true)
        |-- info:governor: string (nullable = true)




        data = [
            {
                "id": 1,
                "name": "Cole Volk",
                "fitness": {"height": 130, "weight": 60},
            },
            {"name": "Mark Reg", "fitness": {"height": 130, "weight": 60}},
            {
                "id": 2,
                "name": "Faye Raker",
                "fitness": {"height": 130, "weight": 60},
            },
        ]


        df = spark.createDataFrame(data=data)

        df.printSchema()

        root
        |-- fitness: map (nullable = true)
        |    |-- key: string
        |    |-- value: long (valueContainsNull = true)
        |-- id: long (nullable = true)
        |-- name: string (nullable = true)

        df_flat = flatten_test(df, sep=":")

        df_flat.printSchema()

        root
        |-- id: long (nullable = true)
        |-- name: string (nullable = true)
        |-- fitness:height: long (nullable = true)
        |-- fitness:weight: long (nullable = true)

        data_struct = [
                (("James",None,"Smith"),"OH","M"),
                (("Anna","Rose",""),"NY","F"),
                (("Julia","","Williams"),"OH","F"),
                (("Maria","Anne","Jones"),"NY","M"),
                (("Jen","Mary","Brown"),"NY","M"),
                (("Mike","Mary","Williams"),"OH","M")
                ]


        schema = StructType([
            StructField('name', StructType([
                StructField('firstname', StringType(), True),
                StructField('middlename', StringType(), True),
                StructField('lastname', StringType(), True)
                ])),
            StructField('state', StringType(), True),
            StructField('gender', StringType(), True)
            ])

        df_struct = spark.createDataFrame(data = data_struct, schema =
schema)

        df_struct.printSchema()

        root
        |-- name: struct (nullable = true)
        |    |-- firstname: string (nullable = true)
        |    |-- middlename: string (nullable = true)
        |    |-- lastname: string (nullable = true)
        |-- state: string (nullable = true)
        |-- gender: string (nullable = true)

        df_struct_flat = flatten_test(df_struct, sep=":")

        df_struct_flat.printSchema()

        root
        |-- state: string (nullable = true)
        |-- gender: string (nullable = true)
        |-- name:firstname: string (nullable = true)
        |-- name:middlename: string (nullable = true)
        |-- name:lastname: string (nullable = true)
        """
    # compute Complex Fields (Arrays, Structs and Maptypes) in Schema
    complex_fields = dict([(field.name, field.dataType)
                            for field in df.schema.fields
                            if type(field.dataType) == ArrayType
                            or type(field.dataType) == StructType
                            or type(field.dataType) == MapType])

    while len(complex_fields) !=0:
        col_name = list(complex_fields.keys())[0]
        #print ("Processing :"+col_name+" Type :
"+str(type(complex_fields[col_name])))

        # if StructType then convert all sub element to columns.
        # i.e. flatten structs
        if (type(complex_fields[col_name]) == StructType):
            expanded = [col(col_name + '.' + k).alias(col_name + sep + k)
            for k in [n.name for n in complex_fields[col_name]]]
            df = df.select("*", *expanded).drop(col_name)

        # if ArrayType then add the Array Elements as Rows using the
explode function
        # i.e. explode Arrays
        elif (type(complex_fields[col_name]) == ArrayType):
            df = df.withColumn(col_name, explode_outer(col_name))

        # if MapType then convert all sub element to columns.
        # i.e. flatten
        elif (type(complex_fields[col_name]) == MapType):
            keys_df =
df.select(explode_outer(map_keys(col(col_name)))).distinct()
            keys = list(map(lambda row: row[0], keys_df.collect()))
            key_cols = list(map(lambda f: col(col_name).getItem(f)
            .alias(str(col_name + sep + f)), keys))
            drop_column_list = [col_name]
            df = df.select([col_name for col_name in df.columns
            if col_name not in drop_column_list] + key_cols)

        # recompute remaining Complex Fields in Schema
        complex_fields = dict([(field.name, field.dataType)
                            for field in df.schema.fields
                            if type(field.dataType) == ArrayType
                            or type(field.dataType) == StructType
                            or type(field.dataType) == MapType])

    return df


*Function to read each file, and apply the functions and save each file as
JSON.*

def json_to_norm_with_null(dir_path, path_to_save):
    path = dir_path

    for filename in os.listdir(path):
        if not filename.endswith('._stript_list.json'):
            continue


        fullname = os.path.join(path, filename)
        with open(fullname) as json_file:
            jsonstr = json.load(json_file)

        df = spark.read.json(fullname)
        df = cleanDataFrame(df)
        df = flatten_test(df, sep=":")
        df.write.mode('append').option('compression',
'snappy').option("ignoreNullFields", "false").json(path_to_save)


*Function to start everything of. With hopefully 10 processes.*

from multiprocessing.pool import ThreadPool
tpool = ThreadPool(processes=10)

tpool.map(json_to_norm_with_null("/home/jovyan/notebooks/falk/data/form_version/F02",
'/home/jovyan/notebooks/falk/F02.json'))


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297

Reply via email to