Yes, partitioning and parallel loading can significantly improve the performance of data extraction from JDBC sources or databases like MongoDB. This approach can leverage Spark's distributed computing capabilities, allowing you to load data in parallel, thus speeding up the overall data loading process.
When loading data from JDBC sources, specifying partitioning options allows Spark to parallelize the data read operation. Here's how you can do it for a JDBC source: Something like below given the information provided from pyspark.sql import SparkSession from concurrent.futures import ThreadPoolExecutor, as_completed def extract_data_from_mongodb(mongo_config): df = spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load() return df # MongoDB configuration mongo_config_template = { "uri": "mongodb://username:password@host:port/database.collection", "partitionColumn": "_id", "lowerBound": None, "upperBound": None } lower_bound = 0 upper_bound = 200 segment_size = 10 # Create segments segments = [(i, min(i + segment_size, upper_bound)) for i in range(lower_bound, upper_bound, segment_size)] # Initialize Spark session spark = SparkSession.builder \ .appName("MongoDBDataLoad") \ .config("spark.mongodb.input.uri", "mongodb://username:password@host:port/database.collection") \ .getOrCreate() # Extract data in parallel using ThreadPoolExecutor def load_segment(segment): segment_lower_bound, segment_upper_bound = segment mongo_config = mongo_config_template.copy() mongo_config["lowerBound"] = str(segment_lower_bound) mongo_config["upperBound"] = str(segment_upper_bound) return extract_data_from_mongodb(mongo_config) with ThreadPoolExecutor() as executor: futures = [executor.submit(load_segment, segment) for segment in segments] for future in as_completed(futures): try: df_segment = future.result() # Process df_segment as needed except Exception as e: print(f"Error: {e}") ThreadPoolExecutor enables parallel execution of tasks using multiple threads. Each thread can be responsible for loading a segment of the data. HTH Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College London <https://en.wikipedia.org/wiki/Imperial_College_London> (voted 2nd best university in the world after MIT https://lnkd.in/eCPt6KTj) London, United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Thu, 6 Jun 2024 at 00:46, Perez <flinkbyhe...@gmail.com> wrote: > Hello experts, > > I was just wondering if I could leverage the below thing to expedite the > loading of the data process in Spark. > > > def extract_data_from_mongodb(mongo_config): df = > glueContext.create_dynamic_frame.from_options( connection_type="mongodb", > connection_options=mongo_config ) return df > > mongo_config = { "connection.uri": "mongodb://url", "database": "", > "collection": "", "username": "", "password": "", "partitionColumn":"_id", > "lowerBound": str(lower_bound), "upperBound": str(upper_bound) } > lower_bound = 0 upper_bound = 200 segment_size = 10 segments = [(i, min(i > + segment_size, upper_bound)) for i in range(lower_bound, upper_bound, > segment_size)] with ThreadPoolExecutor() as executor: futures = > [executor.submit(execution, segment) for segment in segments] for future in > as_completed(futures): try: future.result() except Exception as e: > print(f"Error: {e}") > > I am trying to leverage the parallel threads to pull data in parallel. So > is it effective? >