Thanks again Mich. It gives the clear picture but I have again couple of
doubts:

1) I know that there will be multiple threads that will be executed with 10
segment sizes each until the upper bound is reached but I didn't get this
part of the code exactly segments = [(i, min(i + segment_size,
upper_bound)) for i in range(lower_bound, upper_bound, segment_size)]

2) Also performing union on these small dataframes won't impact performance
right? since spark has to shuffle and combine less data from these
dataframes?


On Thu, Jun 6, 2024 at 3:53 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> well you can dynamically determine the upper bound by first querying the
> database to find the maximum value of the partition column and use it as
> the upper bound for your partitioning logic.
>
> def get_max_value(spark, mongo_config, column_name):
>     max_value_df =
> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>     max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
>     return max_value
>
> # Define your MongoDB config without the bounds first
> mongo_config_base = {
>     "uri": "mongodb://username:password@host:port/database.collection",
>     "partitionColumn": "_id"
> }
>
> # Fetch the dynamic upper bound
> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>
> # Define your segment size
> segment_size = 10
> lower_bound = 0
> segments = [(i, min(i + segment_size, upper_bound)) for i in
> range(lower_bound, upper_bound, segment_size)]
>
> Then you need to aggregate DF from multiple threads When loading data in
> parallel, each thread will load a segment of data into its own DataFrame.
> To aggregate all these DataFrames into a single DataFrame, you can use t*he
> union method in PySpark.*
>
> from concurrent.futures import ThreadPoolExecutor, as_completed
> from pyspark.sql import SparkSession
>
> def extract_data_from_mongodb(mongo_config):
>     df =
> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>     return df
>
> # Function to get the maximum value of the partition column
> def get_max_value(spark, mongo_config, column_name):
>     max_value_df =
> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>     max_value = max_value_df.agg({column_name: "max"}).collect()[0][0]
>     return max_value
>
> # MongoDB configuration without bounds
> mongo_config_base = {
>     "uri": "mongodb://username:password@host:port/database.collection",
>     "partitionColumn": "_id"
> }
>
> # Initialize Spark session
> spark = SparkSession.builder \
>     .appName("MongoDBDataLoad") \
>     .config("spark.mongodb.input.uri", 
> "mongodb://username:password@host:port/database.collection")
> \
>     .getOrCreate()
>
> # Fetch the dynamic upper bound
> upper_bound = get_max_value(spark, mongo_config_base, "_id")
>
> # Define your segment size
> segment_size = 10
> lower_bound = 0
> segments = [(i, min(i + segment_size, upper_bound)) for i in
> range(lower_bound, upper_bound, segment_size)]
>
> # Function to load a segment
> def load_segment(segment):
>     segment_lower_bound, segment_upper_bound = segment
>     mongo_config = mongo_config_base.copy()
>     mongo_config["lowerBound"] = str(segment_lower_bound)
>     mongo_config["upperBound"] = str(segment_upper_bound)
>     return extract_data_from_mongodb(mongo_config)
>
> # Collect all DataFrames from threads
> all_dfs = []
>
> with ThreadPoolExecutor() as executor:
>     futures = [executor.submit(load_segment, segment) for segment in
> segments]
>     for future in as_completed(futures):
>         try:
>             df_segment = future.result()
>             all_dfs.append(df_segment)
>         except Exception as e:
>             print(f"Error: {e}")
>
> # Union all DataFrames into a single DataFrame
> if all_dfs:
>     final_df = all_dfs[0]
>     for df in all_dfs[1:]:
>         final_df = final_df.union(df)
>
> # Proceed with your final DataFrame
> final_df.show()
>
> HTH
>
> Mich Talebzadeh,
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
> London <https://en.wikipedia.org/wiki/Imperial_College_London>
> London, United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Thu, 6 Jun 2024 at 10:52, Perez <flinkbyhe...@gmail.com> wrote:
>
>> Thanks, Mich for your response. However, I have multiple doubts as below:
>>
>> 1) I am trying to load the data for the incremental batch so I am not
>> sure what would be my upper bound. So what can we do?
>> 2) So as each thread loads the desired segment size's data into a
>> dataframe if I want to aggregate all the data from all the threads in a
>> single dataframe what should I do? Keep on appending in a dataframe as it
>> comes?
>>
>>
>> On Thu, Jun 6, 2024 at 1:54 PM Mich Talebzadeh <mich.talebza...@gmail.com>
>> wrote:
>>
>>> Yes, partitioning and parallel loading can significantly improve the
>>> performance of data extraction from JDBC sources or databases like MongoDB.
>>> This approach can leverage Spark's distributed computing capabilities,
>>> allowing you to load data in parallel, thus speeding up the overall data
>>> loading process.
>>>
>>> When loading data from JDBC sources, specifying partitioning options
>>> allows Spark to parallelize the data read operation. Here's how you can do
>>> it for a JDBC source:
>>>
>>> Something like below given the information provided
>>>
>>> from pyspark.sql import SparkSession
>>> from concurrent.futures import ThreadPoolExecutor, as_completed
>>>
>>> def extract_data_from_mongodb(mongo_config):
>>>     df =
>>> spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
>>>     return df
>>>
>>> # MongoDB configuration
>>> mongo_config_template = {
>>>     "uri": "mongodb://username:password@host:port/database.collection",
>>>     "partitionColumn": "_id",
>>>     "lowerBound": None,
>>>     "upperBound": None
>>> }
>>>
>>> lower_bound = 0
>>> upper_bound = 200
>>> segment_size = 10
>>>
>>> # Create segments
>>> segments = [(i, min(i + segment_size, upper_bound)) for i in
>>> range(lower_bound, upper_bound, segment_size)]
>>>
>>> # Initialize Spark session
>>> spark = SparkSession.builder \
>>>     .appName("MongoDBDataLoad") \
>>>     .config("spark.mongodb.input.uri", 
>>> "mongodb://username:password@host:port/database.collection")
>>> \
>>>     .getOrCreate()
>>>
>>> # Extract data in parallel using ThreadPoolExecutor
>>> def load_segment(segment):
>>>     segment_lower_bound, segment_upper_bound = segment
>>>     mongo_config = mongo_config_template.copy()
>>>     mongo_config["lowerBound"] = str(segment_lower_bound)
>>>     mongo_config["upperBound"] = str(segment_upper_bound)
>>>     return extract_data_from_mongodb(mongo_config)
>>>
>>> with ThreadPoolExecutor() as executor:
>>>     futures = [executor.submit(load_segment, segment) for segment in
>>> segments]
>>>     for future in as_completed(futures):
>>>         try:
>>>             df_segment = future.result()
>>>             # Process df_segment as needed
>>>         except Exception as e:
>>>             print(f"Error: {e}")
>>>
>>>
>>> ThreadPoolExecutor enables parallel execution of tasks using multiple
>>> threads. Each thread can be responsible for loading a segment of the data.
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>>> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial
>>> College London <https://en.wikipedia.org/wiki/Imperial_College_London> 
>>> (voted
>>> 2nd best university in the world after MIT https://lnkd.in/eCPt6KTj)
>>> London, United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>
>>>
>>> On Thu, 6 Jun 2024 at 00:46, Perez <flinkbyhe...@gmail.com> wrote:
>>>
>>>> Hello experts,
>>>>
>>>> I was just wondering if I could leverage the below thing to expedite
>>>> the loading of the data process in Spark.
>>>>
>>>>
>>>> def extract_data_from_mongodb(mongo_config): df =
>>>> glueContext.create_dynamic_frame.from_options( connection_type="mongodb",
>>>> connection_options=mongo_config ) return df
>>>>
>>>> mongo_config = { "connection.uri": "mongodb://url", "database": "",
>>>> "collection": "", "username": "", "password": "", "partitionColumn":"_id",
>>>> "lowerBound": str(lower_bound), "upperBound": str(upper_bound) }
>>>> lower_bound = 0 upper_bound = 200 segment_size = 10 segments = [(i,
>>>> min(i + segment_size, upper_bound)) for i in range(lower_bound,
>>>> upper_bound, segment_size)] with ThreadPoolExecutor() as executor: futures
>>>> = [executor.submit(execution, segment) for segment in segments] for future
>>>> in as_completed(futures): try: future.result() except Exception as e:
>>>> print(f"Error: {e}")
>>>>
>>>> I am trying to leverage the parallel threads to pull data in parallel.
>>>> So is it effective?
>>>>
>>>

Reply via email to