Thanks Mich for the response,

We are using Spark DStream with Kafka Integration here.

THis behavior is not observed on other env we have the same application
business logic. So not sure if it's related to the applicative business
logic.

One query, as we have 1200 partitions on this topic, and we have 120
executors processing. Also we have Dynamic Resource Allocation (DRA)
enabled here, so executors do scale up and scale down on load.
So, due to this scale up/down also we see delay in processing as may be due
to repartitioning of topic partition over executors as new consumers are
getting added/deleted.
Do you see this as a cause of the above problem? We tried to disable the
DRA, and have better results so just asking.

Also, need your thought on design, means Spark DStream work on batches
which is created depending on polling time (in our case 0.5 - 3sec). Unless
one batch gets completed, the new batch is not picked for processing.
And, if one of the executors is slow on processing a task of one batch,
though the other executors are done with its task, it has to wait for this
executor to complete before picking the next batch for processing. Do you
have any thoughts / suggestions over this?
Thanks & Regards,
Saurabh Agrawal
Cell: (+91) 9049700365


On Sun, Jan 26, 2025 at 8:12 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Ok let us have a look
>
> Your response is verbose. My assumption is that you are using Spark
> Structured Streaming (SSS) which is possibly causing the high CPU
> utilization for executors when processing messages. So SSS is used as a
> streaming ETL pipeline. It consumes messages from Kafka, performs some
> processing, and publishes the results to other Kafka topics.
>
> observation
>
>
>    1. Small batches, high processing time: Smaller batches seem to take
>    longer to process compared to larger batches. This is not expected for
>    streaming applications, where smaller batches are generally expected to be
>    faster to process.
>    2. Increased processing time over time: Even with consistent input
>    traffic, the processing time per batch increases over time, leading to lag
>    buildup in Kafka.
>    3. High CPU usage for some executors: Not all executors experience
>    high CPU utilization, suggesting some tasks within the batches might be
>    causing delays.
>
> From my experience, these observations point towards potential issues with
> the Spark application's logic or configuration.
>
>
>    - Complex Transformations: If the business logic applied to messages
>    within SSS is complex, it could lead to increased processing time,
>    especially for smaller batches. This might explain why smaller batches take
>    longer than expected.
>    - State Management: Spark Structured Streaming can maintain state
>    information across micro-batches. If the state management is not optimized,
>    it could lead to performance degradation over time, especially when dealing
>    with continuous data streams.
>    - Skewed Data or Slow Tasks: If there's skewed data distribution or
>    specific tasks within the processing logic that are slow, it could overload
>    some executors while others remain idle. This could explain the uneven CPU
>    utilization across executors.
>    - Network Timeouts: The document mentions Kafka connection timeout
>    exceptions during message publishing. These timeouts could further
>    contribute to increased processing time and lag build-up.
>
> While the document does not explicitly mention Spark Structured Streaming,
> the characteristics of the application and the observed behaviour strongly
> suggest its involvement. The high CPU utilization for executors is likely a
> consequence of the application logic or configuration issues within SSS
> that lead to slow processing of messages.
>
> My suggestions
>
>    - Analyze the business logic applied to messages within SSS to
>    identify any potential bottlenecks or inefficiencies. Consider simplifying
>    complex transformations or optimizing state management if necessary. Use
>    Spark GUI on 4040 to see apparent issues such as slow or skewed tasks
>    that might be overloading specific executors. Optimize these tasks or
>    redistribute them for better load balancing.
>    - Investigate Network Timeouts: Address the root cause of the Kafka
>    connection timeout exceptions to prevent delays during message publishing.
>
> HTH
>
> Mich Talebzadeh,
> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
>
>
> On Sat, 25 Jan 2025 at 13:31, Saurabh Agrawal <agrawalsaurabh...@gmail.com>
> wrote:
>
>>
>>
>>
>>
>> *Hi Team,I have been using Spark v3.5 Spark Stream functionality for the
>> below use case. I am observing the issue below on one of the environments
>> with Spark Stream. Please if I can get some assistance with the same will
>> be of valuable help.Use case: *
>>
>> Smart is used as Spark streaming to consume messages from one source
>> topic, business logic to filter out some of these messages, and publish
>> accordingly on target topics.
>>
>>
>>
>> *Initial Configuration:*
>>
>> Kafka Source topic: 24 brokers, 400 partition
>>
>> Kafka Target topic: 200 partition
>>
>> executorsInstances: 15
>>
>> executors cores: 3
>>
>> Input rate: 200-250 messages/sec
>>
>> BackPressure=true
>>
>> BackPressureInitalRate: 200000(default in pom)
>>
>>
>>
>>
>>
>> Now they have a request that there will be some trigger of ban refresh
>> occasionally which will be triggering messages at 800-1000 messages/sec.
>>
>>
>>
>> *New Configuration done:*
>>
>>
>>
>> *Stage 1:*
>>
>> Kafka Source topic: 24 brokers, 1200 partition
>>
>> Kafka Target topic: 800 partition
>>
>> executorsInstances: 120
>>
>> executors cores: 10
>>
>> Input rate: 200-250 messages/sec -  800-1000 messages/sec
>>
>> BackPressure=true
>>
>> BackPressureInitalRate: 100
>>
>> Spark batch schedule time – 0.5 sec
>>
>> maxRatePerPartition: 1000
>>
>>
>>
>>
>>
>> All 120 Executors come up at time of startup due to lag already piled up
>> on Kafka.
>>
>>
>>
>> Problem:
>>
>>    1. When there is huge lag already in the system, and on startup of
>>    application, due to backpressure initial rate, only 1 message were picked
>>    up from each partition, and hence batch size of records in Spark stream
>>    were 1200 only.
>>    2. Over a period of time these delays get cleared, and the system
>>    works fine without scheduling delays for a few hours with incoming traffic
>>    consistent to 100-200 tps.  This time the batch size goes to 50-100
>>    records/batch
>>    3. Now suddenly ban refresh gets triggered and incoming rate
>>    increases to 800-1000 tps. This time, the executors are not able to cope 
>> up
>>    and the lags keep on increasing on Kafka side. And the processing time of
>>    batches keeps getting reduced, eventually making the scheduling delay and
>>    hence reducing the batch size to almost 15-20 records/batches.
>>    4. Observation is small batches were taking more processing time than
>>    big batches.
>>    5. Another issue with NETWORK_CONNECTION error to get metadata from
>>    Kafka while publishing
>>
>>
>>
>>
>>
>> Solution:
>>
>>    1. Removed the initial rate parameter of backpressure so that when
>>    there are millions of records already piled up on Kafka, the batches
>>    created will be of more records/batch than the original 1200 
>> records/batch.
>>    This is due to batches with more records that seem to have better
>>    processing time than small records.
>>    2. Also disabled the backpressure (though not ideal), so we have
>>    consistent records/batch size to deal with and improve processing time.
>>    3. Handled the NETWORK_CONNECTION issue.
>>
>>
>>
>>
>>
>> *Stage 2:*
>>
>> Kafka Source topic: 24 brokers, 1200 partition
>>
>> Kafka Target topic: 800 partition
>>
>> executorsInstances: 120
>>
>> executors cores: 10
>>
>> Input rate: 200-250 messages/sec -  800-1000 messages/sec
>>
>> BackPressure=false
>>
>> maxRatePerPartition: 1000
>>
>> Spark batch schedule time – 0.5 sec
>>
>>
>>
>> What went well:
>>
>>    1. Lags already accumulated get released faster with higher records
>>    per batch (in millions).
>>    2. NETWORK_CONECTION issue resolved
>>
>>
>>
>> Problem persist:
>>
>>    1. Once lag is cleared, and input traffic is consistent, still after
>>    a period of time, the processing time keeps increasing and lag starts to
>>    increase on Kafka.
>>    2. And at times of ban refresh where traffic increases, the lag keeps
>>    getting accumulated.
>>    3. When the batches were of high records, and in the initial stage,
>>    the processing time from smart is 2k-4k messages per sec. But over the
>>    period of time, when the incoming data flow is less 200-400 mps and 
>> batches
>>    of ~300 records, this processing time reduces to 100-200 messages per sec.
>>    4. This also leads to an increase in lags when we have high load, as
>>    the processing time is reduced.
>>    5. *Not all executors are on high CPU usage, some at high speed and
>>    some on low CPU.*
>>
>> Solution:
>>
>>    1. As more records/batches have better processing time, and overall
>>    processing time is greater for batch, increase the scheduling delay of
>>    batches from 0.5s to 3sec so the batches will get created of 600-700
>>    records/batch.
>>    2. Reduced the executorsInstance to 40 so will see the effectiveness
>>    of CPU utilization and processing.
>>
>>
>>
>>
>>
>> *Stage 3:*
>>
>> Kafka Source topic: 24 brokers, 1200 partition
>>
>> Kafka Target topic: 800 partition
>>
>> executorsInstances: 40
>>
>> executors cores: 10
>>
>> Input rate: 200-250 messages/sec -  800-1000 messages/sec
>>
>> BackPressure=false
>>
>> maxRatePerPartition: 1000
>>
>> Spark batch polling time – 3 sec
>>
>>
>>
>> What went well:
>>
>>    1. Lags already accumulated get released faster with higher records
>>    per batch (in millions).
>>    2. With 3 sec polling time, less batches get created with more
>>    records / batch
>>
>>
>>
>> Problem persist:
>>
>>    1. As we increase the polling time of batches, when the load is less
>>    and eventually *processing time/batch duration* reduces, some
>>    executors scale down due to Dynamic resource allocation enabled on Smart.
>>    2. Once again, if the input load increases, new executors do scale
>>    up, but not see they are on high CPU usage as with other executors.
>>    3. *We also see heartbeat interval exception between driver and
>>    executors connection in some executors*
>>    4. *Some of the tasks in batches, takes 30 sec and more on one of the
>>    executors, and hence impacting the overall processing time, and hence
>>    delaying the next batch to execute. This lead to increase lag in Kafka*
>>
>>
>>
>> Reason for connection timeout:
>>
>>    1. Reason for the 30sec task in batch, we saw the logs and there were
>>    some Kafka connection timeout exceptions that happen on publish in these
>>    executors which delay the processing time of overall batch. Need to 
>> resolve
>>    the same.
>>
>>
>>
>>
>>
>>
>>
>> Overall memory Utilization was not found causing any issue. CPU
>> utilization does go high for executors.
>>
>>
>>
>> Looking forward for the response.
>> Thanks & Regards,
>> Saurabh Agrawal
>> Cell: (+91) 9049700365
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

Reply via email to