Have you tried reducing the maxRatePerPartition to a lower value? Based on your settings, I believe you are going to be able to pull *600K* worth of messages from Kafka, basically:
• maxRatePerPartition=15000 • batchInterval 10s • 4 partitions on Ingest topic This results in a maximum ingest rate of 600K: • 4 * 10 * 15000 = 600,000 max Can you reduce the maxRatePerPartition to say 1500 for a test run? That should result in a more manageable batch and you can adjust from there. • 4 * 10 * 1500 = 60,000 max I know we are not setting the maxRate or initialRate, only the maxRatePerPartition and backpressure.enabled. I thought that maxRate was not applicable when using back pressure, but may be mistaken. -Todd On Thu, Jul 26, 2018 at 8:46 AM Biplob Biswas <revolutioni...@gmail.com> wrote: > Hi Todd, > > Thanks for the reply. I have the mayxRatePerPartition set as well. Below > is the spark submit config we used and still got the issue. Also the *batch > interval is set at 10s* and *number of partitions on the topic is set to > 4* : > > spark2-submit --name "${YARN_NAME}" \ > --master yarn \ > --deploy-mode ${DEPLOY_MODE} \ > --num-executors ${NUM_EXECUTORS} \ > --driver-cores ${NUM_DRIVER_CORES} \ > --executor-cores ${NUM_EXECUTOR_CORES} \ > --driver-memory ${DRIVER_MEMORY} \ > --executor-memory ${EXECUTOR_MEMORY} \ > --queue ${YARN_QUEUE} \ > --keytab ${KEYTAB}-yarn \ > --principal ${PRINCIPAL} \ > --conf "spark.yarn.preserve.staging.files=true" \ > --conf "spark.yarn.submit.waitAppCompletion=false" \ > --conf "spark.shuffle.service.enabled=true" \ > --conf "spark.dynamicAllocation.enabled=true" \ > --conf "spark.dynamicAllocation.minExecutors=1" \ > --conf "spark.streaming.backpressure.enabled=true" \ > --conf "spark.streaming.receiver.maxRate=15000" \ > --conf "spark.streaming.kafka.maxRatePerPartition=15000" \ > --conf "spark.streaming.backpressure.initialRate=2000" \ > --conf > "spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/" > \ > --driver-class-path > "/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/" > \ > --driver-java-options "-Djava.security.auth.login.config=./jaas.conf > -Dlog4j.configuration=log4j-spark.properties" \ > --conf > "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf > -Dlog4j.configuration=log4j-spark.properties" \ > --files "${JAAS_CONF},${KEYTAB}" \ > --class "${MAIN_CLASS}" \ > "${ARTIFACT_FILE}" > > > The first batch is huge, even if it worked for the first batch I would've > tried researching more. The problem is that the first batch is more than > 500k records. > > Thanks & Regards > Biplob Biswas > > > On Thu, Jul 26, 2018 at 2:33 PM Todd Nist <tsind...@gmail.com> wrote: > >> Hi Biplob, >> >> How many partitions are on the topic you are reading from and have you >> set the maxRatePerPartition? iirc, spark back pressure is calculated as >> follows: >> >> *Spark back pressure:* >> >> Back pressure is calculated off of the following: >> >> >> • maxRatePerPartition=200 >> >> • batchInterval 30s >> >> • 3 partitions on Ingest topic >> >> >> This results in a maximum ingest rate of 18K: >> >> >> • 3 * 30 * 200 = 180000 max >> >> The spark.streaming.backpressure.initialRate only applies to the first >> batch, per docs: >> >> >> This is the initial maximum receiving rate at which each receiver will >>> receive data for the *first batch* when the backpressure mechanism is >>> enabled. >> >> >> If you set the maxRatePerPartition and apply the above formula, I >> believe you will be able to achieve the results you are looking for. >> >> HTH. >> >> -Todd >> >> >> On Thu, Jul 26, 2018 at 7:21 AM Biplob Biswas <revolutioni...@gmail.com> >> wrote: >> >>> Did anyone face similar issue? and any viable way to solve this? >>> Thanks & Regards >>> Biplob Biswas >>> >>> >>> On Wed, Jul 25, 2018 at 4:23 PM Biplob Biswas <revolutioni...@gmail.com> >>> wrote: >>> >>>> I have enabled the spark.streaming.backpressure.enabled setting and >>>> also set spark.streaming.backpressure.initialRate to 15000, but my >>>> spark job is not respecting these settings when reading from Kafka after a >>>> failure. >>>> >>>> In my kafka topic around 500k records are waiting for being processed >>>> and they are all taken in 1 huge batch which ultimately takes a long time >>>> and fails with executor failure exception. We don't have more resources to >>>> give in our test cluster and we expect the backpressure to kick in and take >>>> smaller batches. >>>> >>>> What can I be doing wrong? >>>> >>>> >>>> Thanks & Regards >>>> Biplob Biswas >>>> >>>