thanks Suurabh for clarification. You are describing a common challenge in distributed systems like Spark Streaming: stragglers. These are slow-running tasks that significantly impact the overall performance of a job. In the context of Spark Streaming, a slow executor processing a batch can delay the entire stream processing pipeline. Let me think about your design and come back.
HTH Mich Talebzadeh, Architect | Data Science | Financial Crime | Forensic Analysis | GDPR view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> On Mon, 27 Jan 2025 at 07:18, Saurabh Agrawal <agrawalsaurabh...@gmail.com> wrote: > Thanks Mich for the response, > > We are using Spark DStream with Kafka Integration here. > > THis behavior is not observed on other env we have the same application > business logic. So not sure if it's related to the applicative business > logic. > > One query, as we have 1200 partitions on this topic, and we have 120 > executors processing. Also we have Dynamic Resource Allocation (DRA) > enabled here, so executors do scale up and scale down on load. > So, due to this scale up/down also we see delay in processing as may be > due to repartitioning of topic partition over executors as new consumers > are getting added/deleted. > Do you see this as a cause of the above problem? We tried to disable the > DRA, and have better results so just asking. > > Also, need your thought on design, means Spark DStream work on batches > which is created depending on polling time (in our case 0.5 - 3sec). Unless > one batch gets completed, the new batch is not picked for processing. > And, if one of the executors is slow on processing a task of one batch, > though the other executors are done with its task, it has to wait for this > executor to complete before picking the next batch for processing. Do you > have any thoughts / suggestions over this? > Thanks & Regards, > Saurabh Agrawal > Cell: (+91) 9049700365 > > > On Sun, Jan 26, 2025 at 8:12 PM Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > >> Ok let us have a look >> >> Your response is verbose. My assumption is that you are using Spark >> Structured Streaming (SSS) which is possibly causing the high CPU >> utilization for executors when processing messages. So SSS is used as a >> streaming ETL pipeline. It consumes messages from Kafka, performs some >> processing, and publishes the results to other Kafka topics. >> >> observation >> >> >> 1. Small batches, high processing time: Smaller batches seem to take >> longer to process compared to larger batches. This is not expected for >> streaming applications, where smaller batches are generally expected to be >> faster to process. >> 2. Increased processing time over time: Even with consistent input >> traffic, the processing time per batch increases over time, leading to lag >> buildup in Kafka. >> 3. High CPU usage for some executors: Not all executors experience >> high CPU utilization, suggesting some tasks within the batches might be >> causing delays. >> >> From my experience, these observations point towards potential issues >> with the Spark application's logic or configuration. >> >> >> - Complex Transformations: If the business logic applied to messages >> within SSS is complex, it could lead to increased processing time, >> especially for smaller batches. This might explain why smaller batches >> take >> longer than expected. >> - State Management: Spark Structured Streaming can maintain state >> information across micro-batches. If the state management is not >> optimized, >> it could lead to performance degradation over time, especially when >> dealing >> with continuous data streams. >> - Skewed Data or Slow Tasks: If there's skewed data distribution or >> specific tasks within the processing logic that are slow, it could >> overload >> some executors while others remain idle. This could explain the uneven CPU >> utilization across executors. >> - Network Timeouts: The document mentions Kafka connection timeout >> exceptions during message publishing. These timeouts could further >> contribute to increased processing time and lag build-up. >> >> While the document does not explicitly mention Spark Structured >> Streaming, the characteristics of the application and the observed >> behaviour strongly suggest its involvement. The high CPU utilization for >> executors is likely a consequence of the application logic or configuration >> issues within SSS that lead to slow processing of messages. >> >> My suggestions >> >> - Analyze the business logic applied to messages within SSS to >> identify any potential bottlenecks or inefficiencies. Consider simplifying >> complex transformations or optimizing state management if necessary. Use >> Spark GUI on 4040 to see apparent issues such as slow or skewed tasks >> that might be overloading specific executors. Optimize these tasks or >> redistribute them for better load balancing. >> - Investigate Network Timeouts: Address the root cause of the Kafka >> connection timeout exceptions to prevent delays during message publishing. >> >> HTH >> >> Mich Talebzadeh, >> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR >> >> view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> >> >> >> On Sat, 25 Jan 2025 at 13:31, Saurabh Agrawal < >> agrawalsaurabh...@gmail.com> wrote: >> >>> >>> >>> >>> >>> *Hi Team,I have been using Spark v3.5 Spark Stream functionality for the >>> below use case. I am observing the issue below on one of the environments >>> with Spark Stream. Please if I can get some assistance with the same will >>> be of valuable help.Use case: * >>> >>> Smart is used as Spark streaming to consume messages from one source >>> topic, business logic to filter out some of these messages, and publish >>> accordingly on target topics. >>> >>> >>> >>> *Initial Configuration:* >>> >>> Kafka Source topic: 24 brokers, 400 partition >>> >>> Kafka Target topic: 200 partition >>> >>> executorsInstances: 15 >>> >>> executors cores: 3 >>> >>> Input rate: 200-250 messages/sec >>> >>> BackPressure=true >>> >>> BackPressureInitalRate: 200000(default in pom) >>> >>> >>> >>> >>> >>> Now they have a request that there will be some trigger of ban refresh >>> occasionally which will be triggering messages at 800-1000 messages/sec. >>> >>> >>> >>> *New Configuration done:* >>> >>> >>> >>> *Stage 1:* >>> >>> Kafka Source topic: 24 brokers, 1200 partition >>> >>> Kafka Target topic: 800 partition >>> >>> executorsInstances: 120 >>> >>> executors cores: 10 >>> >>> Input rate: 200-250 messages/sec - 800-1000 messages/sec >>> >>> BackPressure=true >>> >>> BackPressureInitalRate: 100 >>> >>> Spark batch schedule time – 0.5 sec >>> >>> maxRatePerPartition: 1000 >>> >>> >>> >>> >>> >>> All 120 Executors come up at time of startup due to lag already piled up >>> on Kafka. >>> >>> >>> >>> Problem: >>> >>> 1. When there is huge lag already in the system, and on startup of >>> application, due to backpressure initial rate, only 1 message were picked >>> up from each partition, and hence batch size of records in Spark stream >>> were 1200 only. >>> 2. Over a period of time these delays get cleared, and the system >>> works fine without scheduling delays for a few hours with incoming >>> traffic >>> consistent to 100-200 tps. This time the batch size goes to 50-100 >>> records/batch >>> 3. Now suddenly ban refresh gets triggered and incoming rate >>> increases to 800-1000 tps. This time, the executors are not able to cope >>> up >>> and the lags keep on increasing on Kafka side. And the processing time of >>> batches keeps getting reduced, eventually making the scheduling delay and >>> hence reducing the batch size to almost 15-20 records/batches. >>> 4. Observation is small batches were taking more processing time >>> than big batches. >>> 5. Another issue with NETWORK_CONNECTION error to get metadata from >>> Kafka while publishing >>> >>> >>> >>> >>> >>> Solution: >>> >>> 1. Removed the initial rate parameter of backpressure so that when >>> there are millions of records already piled up on Kafka, the batches >>> created will be of more records/batch than the original 1200 >>> records/batch. >>> This is due to batches with more records that seem to have better >>> processing time than small records. >>> 2. Also disabled the backpressure (though not ideal), so we have >>> consistent records/batch size to deal with and improve processing time. >>> 3. Handled the NETWORK_CONNECTION issue. >>> >>> >>> >>> >>> >>> *Stage 2:* >>> >>> Kafka Source topic: 24 brokers, 1200 partition >>> >>> Kafka Target topic: 800 partition >>> >>> executorsInstances: 120 >>> >>> executors cores: 10 >>> >>> Input rate: 200-250 messages/sec - 800-1000 messages/sec >>> >>> BackPressure=false >>> >>> maxRatePerPartition: 1000 >>> >>> Spark batch schedule time – 0.5 sec >>> >>> >>> >>> What went well: >>> >>> 1. Lags already accumulated get released faster with higher records >>> per batch (in millions). >>> 2. NETWORK_CONECTION issue resolved >>> >>> >>> >>> Problem persist: >>> >>> 1. Once lag is cleared, and input traffic is consistent, still after >>> a period of time, the processing time keeps increasing and lag starts to >>> increase on Kafka. >>> 2. And at times of ban refresh where traffic increases, the lag >>> keeps getting accumulated. >>> 3. When the batches were of high records, and in the initial stage, >>> the processing time from smart is 2k-4k messages per sec. But over the >>> period of time, when the incoming data flow is less 200-400 mps and >>> batches >>> of ~300 records, this processing time reduces to 100-200 messages per >>> sec. >>> 4. This also leads to an increase in lags when we have high load, as >>> the processing time is reduced. >>> 5. *Not all executors are on high CPU usage, some at high speed and >>> some on low CPU.* >>> >>> Solution: >>> >>> 1. As more records/batches have better processing time, and overall >>> processing time is greater for batch, increase the scheduling delay of >>> batches from 0.5s to 3sec so the batches will get created of 600-700 >>> records/batch. >>> 2. Reduced the executorsInstance to 40 so will see the effectiveness >>> of CPU utilization and processing. >>> >>> >>> >>> >>> >>> *Stage 3:* >>> >>> Kafka Source topic: 24 brokers, 1200 partition >>> >>> Kafka Target topic: 800 partition >>> >>> executorsInstances: 40 >>> >>> executors cores: 10 >>> >>> Input rate: 200-250 messages/sec - 800-1000 messages/sec >>> >>> BackPressure=false >>> >>> maxRatePerPartition: 1000 >>> >>> Spark batch polling time – 3 sec >>> >>> >>> >>> What went well: >>> >>> 1. Lags already accumulated get released faster with higher records >>> per batch (in millions). >>> 2. With 3 sec polling time, less batches get created with more >>> records / batch >>> >>> >>> >>> Problem persist: >>> >>> 1. As we increase the polling time of batches, when the load is less >>> and eventually *processing time/batch duration* reduces, some >>> executors scale down due to Dynamic resource allocation enabled on Smart. >>> 2. Once again, if the input load increases, new executors do scale >>> up, but not see they are on high CPU usage as with other executors. >>> 3. *We also see heartbeat interval exception between driver and >>> executors connection in some executors* >>> 4. *Some of the tasks in batches, takes 30 sec and more on one of >>> the executors, and hence impacting the overall processing time, and hence >>> delaying the next batch to execute. This lead to increase lag in Kafka* >>> >>> >>> >>> Reason for connection timeout: >>> >>> 1. Reason for the 30sec task in batch, we saw the logs and there >>> were some Kafka connection timeout exceptions that happen on publish in >>> these executors which delay the processing time of overall batch. Need to >>> resolve the same. >>> >>> >>> >>> >>> >>> >>> >>> Overall memory Utilization was not found causing any issue. CPU >>> utilization does go high for executors. >>> >>> >>> >>> Looking forward for the response. >>> Thanks & Regards, >>> Saurabh Agrawal >>> Cell: (+91) 9049700365 >>> >>> --------------------------------------------------------------------- >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >>