Pool.map requires 2 arguments. 1st a function and 2nd an iterable i.e. list, set etc. Check out examples from official docs how to use it: https://docs.python.org/3/library/multiprocessing.html
On Thu, 21 Jul 2022, 21:25 Bjørn Jørgensen, <bjornjorgen...@gmail.com> wrote: > Thank you. > The reason for using spark local is to test the code, and as in this case > I find the bottlenecks and fix them before I spinn up a K8S cluster. > > I did test it now with > 16 cores and 10 files > > import time > > tic = time.perf_counter() > json_to_norm_with_null("/home/jovyan/notebooks/falk/test", > '/home/jovyan/notebooks/falk/test/test.json') > toc = time.perf_counter() > print(f"Func run in {toc - tic:0.4f} seconds") > > Func run in 30.3695 seconds > > > then I stop spark and stat it with setMaster('local[1]') > > and now > > Func run in 30.8168 seconds > > > Which means that it don`t matter if I run this code on one core or on a > K8S cluster with 100 cores. > > So I tested the same with > > from multiprocessing.pool import ThreadPool > import multiprocessing as mp > > > if __name__ == "__main__": > tic = time.perf_counter() > pool = ThreadPool(mp.cpu_count()) > opt = > pool.map(json_to_norm_with_null("/home/jovyan/notebooks/falk/test", > '/home/jovyan/notebooks/falk/test/test.json')) > toc = time.perf_counter() > print(f"Func run in {toc - tic:0.4f} seconds") > > I get the same files and they are ok. > But I also get this error > > TypeError Traceback (most recent call last) > Input In [33], in <cell line: 5>() 6 tic = time.perf_counter() 7 > pool = ThreadPool(mp.cpu_count())----> 8 opt = > pool.map(json_to_norm_with_null("/home/jovyan/notebooks/falk/test", > '/home/jovyan/notebooks/falk/test/test.json')) 9 toc = > time.perf_counter() 10 print(f"Func run in {toc - tic:0.4f} seconds") > > > TypeError: Pool.map() missing 1 required positional argument: 'iterable' > > So any hints on what to change? :) > > Spark has the pandas on spark API, and that is realy great. I prefer > pandas on spark API and pyspark over pandas. > > tor. 21. jul. 2022 kl. 09:18 skrev Khalid Mammadov < > khalidmammad...@gmail.com>: > >> One quick observation is that you allocate all your local CPUs to Spark >> then execute that app with 10 Threads i.e 10 spark apps and so you will >> need 160cores in total as each will need 16CPUs IMHO. Wouldn't that create >> CPU bottleneck? >> >> Also on the side note, why you need Spark if you use that on local only? >> Sparks power can only be (mainly) observed in a cluster env. >> I have achieved great parallelism using pandas and pools on a local >> machine in the past. >> >> >> On Wed, 20 Jul 2022, 21:39 Bjørn Jørgensen, <bjornjorgen...@gmail.com> >> wrote: >> >>> I have 400k of JSON files. Which is between 10 kb and 500 kb in size. >>> They don`t have the same schema, so I have to loop over them one at a >>> time. >>> >>> This works, but is`s very slow. This process takes 5 days! >>> >>> So now I have tried to run this functions in a ThreadPool. But it don`t >>> seems to work. >>> >>> >>> *Start local spark. The system have 16 cores and 64 GB.* >>> >>> number_cores = int(multiprocessing.cpu_count()) >>> >>> mem_bytes = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES') # >>> e.g. 4015976448 >>> memory_gb = int(mem_bytes/(1024.**3)) # e.g. 3.74 >>> >>> >>> def get_spark_session(app_name: str, conf: SparkConf): >>> conf.setMaster('local[{}]'.format(number_cores)) >>> conf \ >>> .set('spark.driver.memory', '{}g'.format(memory_gb)) \ >>> .set("spark.sql.repl.eagerEval.enabled", "True") \ >>> .set("spark.sql.adaptive.enabled", "True") \ >>> .set("spark.serializer", >>> "org.apache.spark.serializer.KryoSerializer") \ >>> .set("spark.sql.repl.eagerEval.maxNumRows", "10000") >>> >>> return >>> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate() >>> >>> spark = get_spark_session("Falk", SparkConf()) >>> >>> >>> *Function to rename columns with \\ * >>> >>> # We take a dataframe and return a new one with required changes >>> def cleanDataFrame(df: DataFrame) -> DataFrame: >>> # Returns a new sanitized field name (this function can be anything >>> really) >>> def sanitizeFieldName(s: str) -> str: >>> return s.replace("-", "_").replace("&", "_").replace("\"", "_")\ >>> .replace("[", "_").replace("]", "_").replace(".", "_") >>> >>> # We call this on all fields to create a copy and to perform any >>> changes we might >>> # want to do to the field. >>> def sanitizeField(field: StructField) -> StructField: >>> field = copy(field) >>> field.name = sanitizeFieldName(field.name) >>> # We recursively call cleanSchema on all types >>> field.dataType = cleanSchema(field.dataType) >>> return field >>> >>> def cleanSchema(dataType: [DataType]) -> [DateType]: >>> dataType = copy(dataType) >>> # If the type is a StructType we need to recurse otherwise we >>> can return since >>> # we've reached the leaf node >>> if isinstance(dataType, StructType): >>> # We call our sanitizer for all top level fields >>> dataType.fields = [sanitizeField(f) for f in dataType.fields] >>> elif isinstance(dataType, ArrayType): >>> dataType.elementType = cleanSchema(dataType.elementType) >>> return dataType >>> >>> # Now since we have the new schema we can create a new DataFrame by >>> using the old Frame's RDD as data and the new schema as the schema for the >>> data >>> return spark.createDataFrame(df.rdd, cleanSchema(df.schema)) >>> >>> >>> >>> *Function to flatten out a nested dataframe.* >>> >>> >>> from pyspark.sql.types import * >>> from pyspark.sql.functions import * >>> >>> >>> def flatten_test(df, sep="_"): >>> """Returns a flattened dataframe. >>> .. versionadded:: x.X.X >>> >>> Parameters >>> ---------- >>> sep : str >>> Delimiter for flatted columns. Default `_` >>> >>> Notes >>> ----- >>> Don`t use `.` as `sep` >>> It won't work on nested data frames with more than one level. >>> And you will have to use `columns.name`. >>> >>> Flattening Map Types will have to find every key in the column. >>> This can be slow. >>> >>> Examples >>> -------- >>> >>> data_mixed = [ >>> { >>> "state": "Florida", >>> "shortname": "FL", >>> "info": {"governor": "Rick Scott"}, >>> "counties": [ >>> {"name": "Dade", "population": 12345}, >>> {"name": "Broward", "population": 40000}, >>> {"name": "Palm Beach", "population": 60000}, >>> ], >>> }, >>> { >>> "state": "Ohio", >>> "shortname": "OH", >>> "info": {"governor": "John Kasich"}, >>> "counties": [ >>> {"name": "Summit", "population": 1234}, >>> {"name": "Cuyahoga", "population": 1337}, >>> ], >>> }, >>> ] >>> >>> data_mixed = spark.createDataFrame(data=data_mixed) >>> >>> data_mixed.printSchema() >>> >>> root >>> |-- counties: array (nullable = true) >>> | |-- element: map (containsNull = true) >>> | | |-- key: string >>> | | |-- value: string (valueContainsNull = true) >>> |-- info: map (nullable = true) >>> | |-- key: string >>> | |-- value: string (valueContainsNull = true) >>> |-- shortname: string (nullable = true) >>> |-- state: string (nullable = true) >>> >>> >>> data_mixed_flat = flatten_test(df, sep=":") >>> data_mixed_flat.printSchema() >>> root >>> |-- shortname: string (nullable = true) >>> |-- state: string (nullable = true) >>> |-- counties:name: string (nullable = true) >>> |-- counties:population: string (nullable = true) >>> |-- info:governor: string (nullable = true) >>> >>> >>> >>> >>> data = [ >>> { >>> "id": 1, >>> "name": "Cole Volk", >>> "fitness": {"height": 130, "weight": 60}, >>> }, >>> {"name": "Mark Reg", "fitness": {"height": 130, "weight": >>> 60}}, >>> { >>> "id": 2, >>> "name": "Faye Raker", >>> "fitness": {"height": 130, "weight": 60}, >>> }, >>> ] >>> >>> >>> df = spark.createDataFrame(data=data) >>> >>> df.printSchema() >>> >>> root >>> |-- fitness: map (nullable = true) >>> | |-- key: string >>> | |-- value: long (valueContainsNull = true) >>> |-- id: long (nullable = true) >>> |-- name: string (nullable = true) >>> >>> df_flat = flatten_test(df, sep=":") >>> >>> df_flat.printSchema() >>> >>> root >>> |-- id: long (nullable = true) >>> |-- name: string (nullable = true) >>> |-- fitness:height: long (nullable = true) >>> |-- fitness:weight: long (nullable = true) >>> >>> data_struct = [ >>> (("James",None,"Smith"),"OH","M"), >>> (("Anna","Rose",""),"NY","F"), >>> (("Julia","","Williams"),"OH","F"), >>> (("Maria","Anne","Jones"),"NY","M"), >>> (("Jen","Mary","Brown"),"NY","M"), >>> (("Mike","Mary","Williams"),"OH","M") >>> ] >>> >>> >>> schema = StructType([ >>> StructField('name', StructType([ >>> StructField('firstname', StringType(), True), >>> StructField('middlename', StringType(), True), >>> StructField('lastname', StringType(), True) >>> ])), >>> StructField('state', StringType(), True), >>> StructField('gender', StringType(), True) >>> ]) >>> >>> df_struct = spark.createDataFrame(data = data_struct, schema = >>> schema) >>> >>> df_struct.printSchema() >>> >>> root >>> |-- name: struct (nullable = true) >>> | |-- firstname: string (nullable = true) >>> | |-- middlename: string (nullable = true) >>> | |-- lastname: string (nullable = true) >>> |-- state: string (nullable = true) >>> |-- gender: string (nullable = true) >>> >>> df_struct_flat = flatten_test(df_struct, sep=":") >>> >>> df_struct_flat.printSchema() >>> >>> root >>> |-- state: string (nullable = true) >>> |-- gender: string (nullable = true) >>> |-- name:firstname: string (nullable = true) >>> |-- name:middlename: string (nullable = true) >>> |-- name:lastname: string (nullable = true) >>> """ >>> # compute Complex Fields (Arrays, Structs and Maptypes) in Schema >>> complex_fields = dict([(field.name, field.dataType) >>> for field in df.schema.fields >>> if type(field.dataType) == ArrayType >>> or type(field.dataType) == StructType >>> or type(field.dataType) == MapType]) >>> >>> while len(complex_fields) !=0: >>> col_name = list(complex_fields.keys())[0] >>> #print ("Processing :"+col_name+" Type : >>> "+str(type(complex_fields[col_name]))) >>> >>> # if StructType then convert all sub element to columns. >>> # i.e. flatten structs >>> if (type(complex_fields[col_name]) == StructType): >>> expanded = [col(col_name + '.' + k).alias(col_name + sep + >>> k) >>> for k in [n.name for n in complex_fields[col_name]]] >>> df = df.select("*", *expanded).drop(col_name) >>> >>> # if ArrayType then add the Array Elements as Rows using the >>> explode function >>> # i.e. explode Arrays >>> elif (type(complex_fields[col_name]) == ArrayType): >>> df = df.withColumn(col_name, explode_outer(col_name)) >>> >>> # if MapType then convert all sub element to columns. >>> # i.e. flatten >>> elif (type(complex_fields[col_name]) == MapType): >>> keys_df = >>> df.select(explode_outer(map_keys(col(col_name)))).distinct() >>> keys = list(map(lambda row: row[0], keys_df.collect())) >>> key_cols = list(map(lambda f: col(col_name).getItem(f) >>> .alias(str(col_name + sep + f)), keys)) >>> drop_column_list = [col_name] >>> df = df.select([col_name for col_name in df.columns >>> if col_name not in drop_column_list] + key_cols) >>> >>> # recompute remaining Complex Fields in Schema >>> complex_fields = dict([(field.name, field.dataType) >>> for field in df.schema.fields >>> if type(field.dataType) == ArrayType >>> or type(field.dataType) == StructType >>> or type(field.dataType) == MapType]) >>> >>> return df >>> >>> >>> *Function to read each file, and apply the functions and save each file >>> as JSON.* >>> >>> def json_to_norm_with_null(dir_path, path_to_save): >>> path = dir_path >>> >>> for filename in os.listdir(path): >>> if not filename.endswith('._stript_list.json'): >>> continue >>> >>> >>> fullname = os.path.join(path, filename) >>> with open(fullname) as json_file: >>> jsonstr = json.load(json_file) >>> >>> df = spark.read.json(fullname) >>> df = cleanDataFrame(df) >>> df = flatten_test(df, sep=":") >>> df.write.mode('append').option('compression', >>> 'snappy').option("ignoreNullFields", "false").json(path_to_save) >>> >>> >>> *Function to start everything of. With hopefully 10 processes.* >>> >>> from multiprocessing.pool import ThreadPool >>> tpool = ThreadPool(processes=10) >>> >>> tpool.map(json_to_norm_with_null("/home/jovyan/notebooks/falk/data/form_version/F02", >>> '/home/jovyan/notebooks/falk/F02.json')) >>> >>> >>> -- >>> Bjørn Jørgensen >>> Vestre Aspehaug 4, 6010 Ålesund >>> Norge >>> >>> +47 480 94 297 >>> >> > > -- > Bjørn Jørgensen > Vestre Aspehaug 4, 6010 Ålesund > Norge > > +47 480 94 297 >