Yes, partitioning and parallel loading can significantly improve the
performance of data extraction from JDBC sources or databases like MongoDB.
This approach can leverage Spark's distributed computing capabilities,
allowing you to load data in parallel, thus speeding up the overall data
loading process.

When loading data from JDBC sources, specifying partitioning options allows
Spark to parallelize the data read operation. Here's how you can do it for
a JDBC source:

Something like below given the information provided

from pyspark.sql import SparkSession
from concurrent.futures import ThreadPoolExecutor, as_completed

def extract_data_from_mongodb(mongo_config):
    df =
spark.read.format("com.mongodb.spark.sql.DefaultSource").options(**mongo_config).load()
    return df

# MongoDB configuration
mongo_config_template = {
    "uri": "mongodb://username:password@host:port/database.collection",
    "partitionColumn": "_id",
    "lowerBound": None,
    "upperBound": None
}

lower_bound = 0
upper_bound = 200
segment_size = 10

# Create segments
segments = [(i, min(i + segment_size, upper_bound)) for i in
range(lower_bound, upper_bound, segment_size)]

# Initialize Spark session
spark = SparkSession.builder \
    .appName("MongoDBDataLoad") \
    .config("spark.mongodb.input.uri",
"mongodb://username:password@host:port/database.collection")
\
    .getOrCreate()

# Extract data in parallel using ThreadPoolExecutor
def load_segment(segment):
    segment_lower_bound, segment_upper_bound = segment
    mongo_config = mongo_config_template.copy()
    mongo_config["lowerBound"] = str(segment_lower_bound)
    mongo_config["upperBound"] = str(segment_upper_bound)
    return extract_data_from_mongodb(mongo_config)

with ThreadPoolExecutor() as executor:
    futures = [executor.submit(load_segment, segment) for segment in
segments]
    for future in as_completed(futures):
        try:
            df_segment = future.result()
            # Process df_segment as needed
        except Exception as e:
            print(f"Error: {e}")


ThreadPoolExecutor enables parallel execution of tasks using multiple
threads. Each thread can be responsible for loading a segment of the data.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
London <https://en.wikipedia.org/wiki/Imperial_College_London> (voted 2nd
best university in the world after MIT https://lnkd.in/eCPt6KTj)
London, United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Thu, 6 Jun 2024 at 00:46, Perez <flinkbyhe...@gmail.com> wrote:

> Hello experts,
>
> I was just wondering if I could leverage the below thing to expedite the
> loading of the data process in Spark.
>
>
> def extract_data_from_mongodb(mongo_config): df =
> glueContext.create_dynamic_frame.from_options( connection_type="mongodb",
> connection_options=mongo_config ) return df
>
> mongo_config = { "connection.uri": "mongodb://url", "database": "",
> "collection": "", "username": "", "password": "", "partitionColumn":"_id",
> "lowerBound": str(lower_bound), "upperBound": str(upper_bound) }
> lower_bound = 0 upper_bound = 200 segment_size = 10 segments = [(i, min(i
> + segment_size, upper_bound)) for i in range(lower_bound, upper_bound,
> segment_size)] with ThreadPoolExecutor() as executor: futures =
> [executor.submit(execution, segment) for segment in segments] for future in
> as_completed(futures): try: future.result() except Exception as e:
> print(f"Error: {e}")
>
> I am trying to leverage the parallel threads to pull data in parallel. So
> is it effective?
>

Reply via email to