Pool.map requires 2 arguments. 1st a function and 2nd an iterable i.e.
list, set etc.
Check out examples from official docs how to use it:
https://docs.python.org/3/library/multiprocessing.html


On Thu, 21 Jul 2022, 21:25 Bjørn Jørgensen, <bjornjorgen...@gmail.com>
wrote:

> Thank you.
> The reason for using spark local is to test the code, and as in this case
> I find the bottlenecks and fix them before I spinn up a K8S cluster.
>
> I did test it now with
> 16 cores and 10 files
>
> import time
>
> tic = time.perf_counter()
> json_to_norm_with_null("/home/jovyan/notebooks/falk/test",
> '/home/jovyan/notebooks/falk/test/test.json')
> toc = time.perf_counter()
> print(f"Func run in {toc - tic:0.4f} seconds")
>
> Func run in 30.3695 seconds
>
>
> then I stop spark and stat it with setMaster('local[1]')
>
> and now
>
> Func run in 30.8168 seconds
>
>
> Which means that it don`t matter if I run this code on one core or on a
> K8S cluster with 100 cores.
>
> So I tested the same with
>
> from multiprocessing.pool import ThreadPool
> import multiprocessing as mp
>
>
> if __name__ == "__main__":
>     tic = time.perf_counter()
>     pool = ThreadPool(mp.cpu_count())
>     opt =
> pool.map(json_to_norm_with_null("/home/jovyan/notebooks/falk/test",
> '/home/jovyan/notebooks/falk/test/test.json'))
>     toc = time.perf_counter()
>     print(f"Func run in {toc - tic:0.4f} seconds")
>
> I get the same files and they are ok.
> But I also get this error
>
> TypeError                                 Traceback (most recent call last)
> Input In [33], in <cell line: 5>()      6 tic = time.perf_counter()      7 
> pool = ThreadPool(mp.cpu_count())----> 8 opt = 
> pool.map(json_to_norm_with_null("/home/jovyan/notebooks/falk/test", 
> '/home/jovyan/notebooks/falk/test/test.json'))      9 toc = 
> time.perf_counter()     10 print(f"Func run in {toc - tic:0.4f} seconds")
>
>
> TypeError: Pool.map() missing 1 required positional argument: 'iterable'
>
> So any hints on what to change? :)
>
> Spark has the pandas on spark API, and that is realy great. I prefer
> pandas on spark API and pyspark over pandas.
>
> tor. 21. jul. 2022 kl. 09:18 skrev Khalid Mammadov <
> khalidmammad...@gmail.com>:
>
>> One quick observation is that you allocate all your local CPUs to Spark
>> then execute that app with 10 Threads i.e 10 spark apps and so you will
>> need 160cores in total as each will need 16CPUs IMHO. Wouldn't that create
>> CPU bottleneck?
>>
>> Also on the side note, why you need Spark if you use that on local only?
>> Sparks power can only be (mainly) observed in a cluster env.
>> I have achieved great parallelism using pandas and pools on a local
>> machine in the past.
>>
>>
>> On Wed, 20 Jul 2022, 21:39 Bjørn Jørgensen, <bjornjorgen...@gmail.com>
>> wrote:
>>
>>> I have 400k of JSON files. Which is between 10 kb and 500 kb in size.
>>> They don`t have the same schema, so I have to loop over them one at a
>>> time.
>>>
>>> This works, but is`s very slow. This process takes 5 days!
>>>
>>> So now I have tried to run this functions in a ThreadPool. But it don`t
>>> seems to work.
>>>
>>>
>>> *Start local spark. The system have 16 cores and 64 GB.*
>>>
>>> number_cores = int(multiprocessing.cpu_count())
>>>
>>> mem_bytes = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')  #
>>> e.g. 4015976448
>>> memory_gb = int(mem_bytes/(1024.**3))  # e.g. 3.74
>>>
>>>
>>> def get_spark_session(app_name: str, conf: SparkConf):
>>>     conf.setMaster('local[{}]'.format(number_cores))
>>>     conf \
>>>       .set('spark.driver.memory', '{}g'.format(memory_gb)) \
>>>       .set("spark.sql.repl.eagerEval.enabled", "True") \
>>>       .set("spark.sql.adaptive.enabled", "True") \
>>>       .set("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer") \
>>>       .set("spark.sql.repl.eagerEval.maxNumRows", "10000")
>>>
>>>     return
>>> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
>>>
>>> spark = get_spark_session("Falk", SparkConf())
>>>
>>>
>>> *Function to rename columns with \\ *
>>>
>>> # We take a dataframe and return a new one with required changes
>>> def cleanDataFrame(df: DataFrame) -> DataFrame:
>>>     # Returns a new sanitized field name (this function can be anything
>>> really)
>>>     def sanitizeFieldName(s: str) -> str:
>>>         return s.replace("-", "_").replace("&", "_").replace("\"", "_")\
>>>             .replace("[", "_").replace("]", "_").replace(".", "_")
>>>
>>>     # We call this on all fields to create a copy and to perform any
>>> changes we might
>>>     # want to do to the field.
>>>     def sanitizeField(field: StructField) -> StructField:
>>>         field = copy(field)
>>>         field.name = sanitizeFieldName(field.name)
>>>         # We recursively call cleanSchema on all types
>>>         field.dataType = cleanSchema(field.dataType)
>>>         return field
>>>
>>>     def cleanSchema(dataType: [DataType]) -> [DateType]:
>>>         dataType = copy(dataType)
>>>         # If the type is a StructType we need to recurse otherwise we
>>> can return since
>>>         # we've reached the leaf node
>>>         if isinstance(dataType, StructType):
>>>             # We call our sanitizer for all top level fields
>>>             dataType.fields = [sanitizeField(f) for f in dataType.fields]
>>>         elif isinstance(dataType, ArrayType):
>>>             dataType.elementType = cleanSchema(dataType.elementType)
>>>         return dataType
>>>
>>>     # Now since we have the new schema we can create a new DataFrame by
>>> using the old Frame's RDD as data and the new schema as the schema for the
>>> data
>>>     return spark.createDataFrame(df.rdd, cleanSchema(df.schema))
>>>
>>>
>>>
>>> *Function to flatten out a nested dataframe.*
>>>
>>>
>>> from pyspark.sql.types import *
>>> from pyspark.sql.functions import *
>>>
>>>
>>> def flatten_test(df, sep="_"):
>>>     """Returns a flattened dataframe.
>>>         .. versionadded:: x.X.X
>>>
>>>         Parameters
>>>         ----------
>>>         sep : str
>>>             Delimiter for flatted columns. Default `_`
>>>
>>>         Notes
>>>         -----
>>>         Don`t use `.` as `sep`
>>>         It won't work on nested data frames with more than one level.
>>>         And you will have to use `columns.name`.
>>>
>>>         Flattening Map Types will have to find every key in the column.
>>>         This can be slow.
>>>
>>>         Examples
>>>         --------
>>>
>>>         data_mixed = [
>>>             {
>>>                 "state": "Florida",
>>>                 "shortname": "FL",
>>>                 "info": {"governor": "Rick Scott"},
>>>                 "counties": [
>>>                     {"name": "Dade", "population": 12345},
>>>                     {"name": "Broward", "population": 40000},
>>>                     {"name": "Palm Beach", "population": 60000},
>>>                 ],
>>>             },
>>>             {
>>>                 "state": "Ohio",
>>>                 "shortname": "OH",
>>>                 "info": {"governor": "John Kasich"},
>>>                 "counties": [
>>>                     {"name": "Summit", "population": 1234},
>>>                     {"name": "Cuyahoga", "population": 1337},
>>>                 ],
>>>             },
>>>         ]
>>>
>>>         data_mixed = spark.createDataFrame(data=data_mixed)
>>>
>>>         data_mixed.printSchema()
>>>
>>>         root
>>>         |-- counties: array (nullable = true)
>>>         |    |-- element: map (containsNull = true)
>>>         |    |    |-- key: string
>>>         |    |    |-- value: string (valueContainsNull = true)
>>>         |-- info: map (nullable = true)
>>>         |    |-- key: string
>>>         |    |-- value: string (valueContainsNull = true)
>>>         |-- shortname: string (nullable = true)
>>>         |-- state: string (nullable = true)
>>>
>>>
>>>         data_mixed_flat = flatten_test(df, sep=":")
>>>         data_mixed_flat.printSchema()
>>>         root
>>>         |-- shortname: string (nullable = true)
>>>         |-- state: string (nullable = true)
>>>         |-- counties:name: string (nullable = true)
>>>         |-- counties:population: string (nullable = true)
>>>         |-- info:governor: string (nullable = true)
>>>
>>>
>>>
>>>
>>>         data = [
>>>             {
>>>                 "id": 1,
>>>                 "name": "Cole Volk",
>>>                 "fitness": {"height": 130, "weight": 60},
>>>             },
>>>             {"name": "Mark Reg", "fitness": {"height": 130, "weight":
>>> 60}},
>>>             {
>>>                 "id": 2,
>>>                 "name": "Faye Raker",
>>>                 "fitness": {"height": 130, "weight": 60},
>>>             },
>>>         ]
>>>
>>>
>>>         df = spark.createDataFrame(data=data)
>>>
>>>         df.printSchema()
>>>
>>>         root
>>>         |-- fitness: map (nullable = true)
>>>         |    |-- key: string
>>>         |    |-- value: long (valueContainsNull = true)
>>>         |-- id: long (nullable = true)
>>>         |-- name: string (nullable = true)
>>>
>>>         df_flat = flatten_test(df, sep=":")
>>>
>>>         df_flat.printSchema()
>>>
>>>         root
>>>         |-- id: long (nullable = true)
>>>         |-- name: string (nullable = true)
>>>         |-- fitness:height: long (nullable = true)
>>>         |-- fitness:weight: long (nullable = true)
>>>
>>>         data_struct = [
>>>                 (("James",None,"Smith"),"OH","M"),
>>>                 (("Anna","Rose",""),"NY","F"),
>>>                 (("Julia","","Williams"),"OH","F"),
>>>                 (("Maria","Anne","Jones"),"NY","M"),
>>>                 (("Jen","Mary","Brown"),"NY","M"),
>>>                 (("Mike","Mary","Williams"),"OH","M")
>>>                 ]
>>>
>>>
>>>         schema = StructType([
>>>             StructField('name', StructType([
>>>                 StructField('firstname', StringType(), True),
>>>                 StructField('middlename', StringType(), True),
>>>                 StructField('lastname', StringType(), True)
>>>                 ])),
>>>             StructField('state', StringType(), True),
>>>             StructField('gender', StringType(), True)
>>>             ])
>>>
>>>         df_struct = spark.createDataFrame(data = data_struct, schema =
>>> schema)
>>>
>>>         df_struct.printSchema()
>>>
>>>         root
>>>         |-- name: struct (nullable = true)
>>>         |    |-- firstname: string (nullable = true)
>>>         |    |-- middlename: string (nullable = true)
>>>         |    |-- lastname: string (nullable = true)
>>>         |-- state: string (nullable = true)
>>>         |-- gender: string (nullable = true)
>>>
>>>         df_struct_flat = flatten_test(df_struct, sep=":")
>>>
>>>         df_struct_flat.printSchema()
>>>
>>>         root
>>>         |-- state: string (nullable = true)
>>>         |-- gender: string (nullable = true)
>>>         |-- name:firstname: string (nullable = true)
>>>         |-- name:middlename: string (nullable = true)
>>>         |-- name:lastname: string (nullable = true)
>>>         """
>>>     # compute Complex Fields (Arrays, Structs and Maptypes) in Schema
>>>     complex_fields = dict([(field.name, field.dataType)
>>>                             for field in df.schema.fields
>>>                             if type(field.dataType) == ArrayType
>>>                             or type(field.dataType) == StructType
>>>                             or type(field.dataType) == MapType])
>>>
>>>     while len(complex_fields) !=0:
>>>         col_name = list(complex_fields.keys())[0]
>>>         #print ("Processing :"+col_name+" Type :
>>> "+str(type(complex_fields[col_name])))
>>>
>>>         # if StructType then convert all sub element to columns.
>>>         # i.e. flatten structs
>>>         if (type(complex_fields[col_name]) == StructType):
>>>             expanded = [col(col_name + '.' + k).alias(col_name + sep +
>>> k)
>>>             for k in [n.name for n in complex_fields[col_name]]]
>>>             df = df.select("*", *expanded).drop(col_name)
>>>
>>>         # if ArrayType then add the Array Elements as Rows using the
>>> explode function
>>>         # i.e. explode Arrays
>>>         elif (type(complex_fields[col_name]) == ArrayType):
>>>             df = df.withColumn(col_name, explode_outer(col_name))
>>>
>>>         # if MapType then convert all sub element to columns.
>>>         # i.e. flatten
>>>         elif (type(complex_fields[col_name]) == MapType):
>>>             keys_df =
>>> df.select(explode_outer(map_keys(col(col_name)))).distinct()
>>>             keys = list(map(lambda row: row[0], keys_df.collect()))
>>>             key_cols = list(map(lambda f: col(col_name).getItem(f)
>>>             .alias(str(col_name + sep + f)), keys))
>>>             drop_column_list = [col_name]
>>>             df = df.select([col_name for col_name in df.columns
>>>             if col_name not in drop_column_list] + key_cols)
>>>
>>>         # recompute remaining Complex Fields in Schema
>>>         complex_fields = dict([(field.name, field.dataType)
>>>                             for field in df.schema.fields
>>>                             if type(field.dataType) == ArrayType
>>>                             or type(field.dataType) == StructType
>>>                             or type(field.dataType) == MapType])
>>>
>>>     return df
>>>
>>>
>>> *Function to read each file, and apply the functions and save each file
>>> as JSON.*
>>>
>>> def json_to_norm_with_null(dir_path, path_to_save):
>>>     path = dir_path
>>>
>>>     for filename in os.listdir(path):
>>>         if not filename.endswith('._stript_list.json'):
>>>             continue
>>>
>>>
>>>         fullname = os.path.join(path, filename)
>>>         with open(fullname) as json_file:
>>>             jsonstr = json.load(json_file)
>>>
>>>         df = spark.read.json(fullname)
>>>         df = cleanDataFrame(df)
>>>         df = flatten_test(df, sep=":")
>>>         df.write.mode('append').option('compression',
>>> 'snappy').option("ignoreNullFields", "false").json(path_to_save)
>>>
>>>
>>> *Function to start everything of. With hopefully 10 processes.*
>>>
>>> from multiprocessing.pool import ThreadPool
>>> tpool = ThreadPool(processes=10)
>>>
>>> tpool.map(json_to_norm_with_null("/home/jovyan/notebooks/falk/data/form_version/F02",
>>> '/home/jovyan/notebooks/falk/F02.json'))
>>>
>>>
>>> --
>>> Bjørn Jørgensen
>>> Vestre Aspehaug 4, 6010 Ålesund
>>> Norge
>>>
>>> +47 480 94 297
>>>
>>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>

Reply via email to