Thanks again Mich. It gives the clear picture but I have again couple of doubts:
1) I know that there will be multiple threads that will be executed with 10 segment sizes each until the upper bound is reached but I didn't get this part of the code exactly segments = [(i, min(i + segment_size, upper_bound)) for i in range(lower_bound, upper_bound, segment_size)] 2) Also performing union on these small dataframes won't impact performance right? since spark has to shuffle and combine less data from these dataframes? On Thu, Jun 6, 2024 at 3:53 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > well you can dynamically determine the upper bound by first querying the > database to find the maximum value of the partition column and use it as > the upper bound for your partitioning logic. > > def get_max_value(spark, mongo_config, column_name): > max_value_df = > spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load() > max_value = max_value_df.agg({column_name: "max"}).collect()[0][0] > return max_value > > # Define your MongoDB config without the bounds first > mongo_config_base = { > "uri": "mongodb://username:password@host:port/database.collection", > "partitionColumn": "_id" > } > > # Fetch the dynamic upper bound > upper_bound = get_max_value(spark, mongo_config_base, "_id") > > # Define your segment size > segment_size = 10 > lower_bound = 0 > segments = [(i, min(i + segment_size, upper_bound)) for i in > range(lower_bound, upper_bound, segment_size)] > > Then you need to aggregate DF from multiple threads When loading data in > parallel, each thread will load a segment of data into its own DataFrame. > To aggregate all these DataFrames into a single DataFrame, you can use t*he > union method in PySpark.* > > from concurrent.futures import ThreadPoolExecutor, as_completed > from pyspark.sql import SparkSession > > def extract_data_from_mongodb(mongo_config): > df = > spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load() > return df > > # Function to get the maximum value of the partition column > def get_max_value(spark, mongo_config, column_name): > max_value_df = > spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load() > max_value = max_value_df.agg({column_name: "max"}).collect()[0][0] > return max_value > > # MongoDB configuration without bounds > mongo_config_base = { > "uri": "mongodb://username:password@host:port/database.collection", > "partitionColumn": "_id" > } > > # Initialize Spark session > spark = SparkSession.builder \ > .appName("MongoDBDataLoad") \ > .config("spark.mongodb.input.uri", > "mongodb://username:password@host:port/database.collection") > \ > .getOrCreate() > > # Fetch the dynamic upper bound > upper_bound = get_max_value(spark, mongo_config_base, "_id") > > # Define your segment size > segment_size = 10 > lower_bound = 0 > segments = [(i, min(i + segment_size, upper_bound)) for i in > range(lower_bound, upper_bound, segment_size)] > > # Function to load a segment > def load_segment(segment): > segment_lower_bound, segment_upper_bound = segment > mongo_config = mongo_config_base.copy() > mongo_config["lowerBound"] = str(segment_lower_bound) > mongo_config["upperBound"] = str(segment_upper_bound) > return extract_data_from_mongodb(mongo_config) > > # Collect all DataFrames from threads > all_dfs = [] > > with ThreadPoolExecutor() as executor: > futures = [executor.submit(load_segment, segment) for segment in > segments] > for future in as_completed(futures): > try: > df_segment = future.result() > all_dfs.append(df_segment) > except Exception as e: > print(f"Error: {e}") > > # Union all DataFrames into a single DataFrame > if all_dfs: > final_df = all_dfs[0] > for df in all_dfs[1:]: > final_df = final_df.union(df) > > # Proceed with your final DataFrame > final_df.show() > > HTH > > Mich Talebzadeh, > Technologist | Architect | Data Engineer | Generative AI | FinCrime > PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College > London <https://en.wikipedia.org/wiki/Imperial_College_London> > London, United Kingdom > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* The information provided is correct to the best of my > knowledge but of course cannot be guaranteed . It is essential to note > that, as with any advice, quote "one test result is worth one-thousand > expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von > Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". > > > On Thu, 6 Jun 2024 at 10:52, Perez <flinkbyhe...@gmail.com> wrote: > >> Thanks, Mich for your response. However, I have multiple doubts as below: >> >> 1) I am trying to load the data for the incremental batch so I am not >> sure what would be my upper bound. So what can we do? >> 2) So as each thread loads the desired segment size's data into a >> dataframe if I want to aggregate all the data from all the threads in a >> single dataframe what should I do? Keep on appending in a dataframe as it >> comes? >> >> >> On Thu, Jun 6, 2024 at 1:54 PM Mich Talebzadeh <mich.talebza...@gmail.com> >> wrote: >> >>> Yes, partitioning and parallel loading can significantly improve the >>> performance of data extraction from JDBC sources or databases like MongoDB. >>> This approach can leverage Spark's distributed computing capabilities, >>> allowing you to load data in parallel, thus speeding up the overall data >>> loading process. >>> >>> When loading data from JDBC sources, specifying partitioning options >>> allows Spark to parallelize the data read operation. Here's how you can do >>> it for a JDBC source: >>> >>> Something like below given the information provided >>> >>> from pyspark.sql import SparkSession >>> from concurrent.futures import ThreadPoolExecutor, as_completed >>> >>> def extract_data_from_mongodb(mongo_config): >>> df = >>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load() >>> return df >>> >>> # MongoDB configuration >>> mongo_config_template = { >>> "uri": "mongodb://username:password@host:port/database.collection", >>> "partitionColumn": "_id", >>> "lowerBound": None, >>> "upperBound": None >>> } >>> >>> lower_bound = 0 >>> upper_bound = 200 >>> segment_size = 10 >>> >>> # Create segments >>> segments = [(i, min(i + segment_size, upper_bound)) for i in >>> range(lower_bound, upper_bound, segment_size)] >>> >>> # Initialize Spark session >>> spark = SparkSession.builder \ >>> .appName("MongoDBDataLoad") \ >>> .config("spark.mongodb.input.uri", >>> "mongodb://username:password@host:port/database.collection") >>> \ >>> .getOrCreate() >>> >>> # Extract data in parallel using ThreadPoolExecutor >>> def load_segment(segment): >>> segment_lower_bound, segment_upper_bound = segment >>> mongo_config = mongo_config_template.copy() >>> mongo_config["lowerBound"] = str(segment_lower_bound) >>> mongo_config["upperBound"] = str(segment_upper_bound) >>> return extract_data_from_mongodb(mongo_config) >>> >>> with ThreadPoolExecutor() as executor: >>> futures = [executor.submit(load_segment, segment) for segment in >>> segments] >>> for future in as_completed(futures): >>> try: >>> df_segment = future.result() >>> # Process df_segment as needed >>> except Exception as e: >>> print(f"Error: {e}") >>> >>> >>> ThreadPoolExecutor enables parallel execution of tasks using multiple >>> threads. Each thread can be responsible for loading a segment of the data. >>> >>> HTH >>> >>> Mich Talebzadeh, >>> Technologist | Architect | Data Engineer | Generative AI | FinCrime >>> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial >>> College London <https://en.wikipedia.org/wiki/Imperial_College_London> >>> (voted >>> 2nd best university in the world after MIT https://lnkd.in/eCPt6KTj) >>> London, United Kingdom >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> https://en.everybodywiki.com/Mich_Talebzadeh >>> >>> >>> >>> *Disclaimer:* The information provided is correct to the best of my >>> knowledge but of course cannot be guaranteed . It is essential to note >>> that, as with any advice, quote "one test result is worth one-thousand >>> expert opinions (Werner >>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >>> >>> >>> On Thu, 6 Jun 2024 at 00:46, Perez <flinkbyhe...@gmail.com> wrote: >>> >>>> Hello experts, >>>> >>>> I was just wondering if I could leverage the below thing to expedite >>>> the loading of the data process in Spark. >>>> >>>> >>>> def extract_data_from_mongodb(mongo_config): df = >>>> glueContext.create_dynamic_frame.from_options( connection_type="mongodb", >>>> connection_options=mongo_config ) return df >>>> >>>> mongo_config = { "connection.uri": "mongodb://url", "database": "", >>>> "collection": "", "username": "", "password": "", "partitionColumn":"_id", >>>> "lowerBound": str(lower_bound), "upperBound": str(upper_bound) } >>>> lower_bound = 0 upper_bound = 200 segment_size = 10 segments = [(i, >>>> min(i + segment_size, upper_bound)) for i in range(lower_bound, >>>> upper_bound, segment_size)] with ThreadPoolExecutor() as executor: futures >>>> = [executor.submit(execution, segment) for segment in segments] for future >>>> in as_completed(futures): try: future.result() except Exception as e: >>>> print(f"Error: {e}") >>>> >>>> I am trying to leverage the parallel threads to pull data in parallel. >>>> So is it effective? >>>> >>>