Have you tried reducing the maxRatePerPartition to a lower value?  Based on
your settings, I believe you are going to be able to pull *600K* worth of
messages from Kafka, basically:

  • maxRatePerPartition=15000

• batchInterval 10s

• 4 partitions on Ingest topic


This results in a maximum ingest rate of 600K:


• 4 * 10 * 15000 = 600,000 max

Can you reduce the maxRatePerPartition to say 1500 for a test run?  That
should result in a more manageable  batch and you can adjust from there.


• 4 * 10 * 1500 = 60,000 max

I know we are not setting the maxRate or initialRate, only the
maxRatePerPartition and backpressure.enabled.  I thought that maxRate was
not applicable when using back pressure, but may be mistaken.


-Todd






On Thu, Jul 26, 2018 at 8:46 AM Biplob Biswas <revolutioni...@gmail.com>
wrote:

> Hi Todd,
>
> Thanks for the reply. I have the mayxRatePerPartition set as well. Below
> is the spark submit config we used and still got the issue. Also the *batch
> interval is set at 10s* and *number of partitions on the topic is set to
> 4*  :
>
> spark2-submit --name "${YARN_NAME}" \
>    --master yarn \
>    --deploy-mode ${DEPLOY_MODE} \
>    --num-executors ${NUM_EXECUTORS} \
>    --driver-cores ${NUM_DRIVER_CORES} \
>    --executor-cores ${NUM_EXECUTOR_CORES} \
>    --driver-memory ${DRIVER_MEMORY} \
>    --executor-memory ${EXECUTOR_MEMORY} \
>    --queue ${YARN_QUEUE} \
>    --keytab ${KEYTAB}-yarn \
>    --principal ${PRINCIPAL} \
>    --conf "spark.yarn.preserve.staging.files=true" \
>    --conf "spark.yarn.submit.waitAppCompletion=false" \
>    --conf "spark.shuffle.service.enabled=true" \
>    --conf "spark.dynamicAllocation.enabled=true" \
>    --conf "spark.dynamicAllocation.minExecutors=1" \
>    --conf "spark.streaming.backpressure.enabled=true" \
>    --conf "spark.streaming.receiver.maxRate=15000" \
>    --conf "spark.streaming.kafka.maxRatePerPartition=15000" \
>    --conf "spark.streaming.backpressure.initialRate=2000" \
>    --conf 
> "spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/"
>  \
>    --driver-class-path 
> "/opt/cloudera/parcels/CDH/lib/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/"
>  \
>    --driver-java-options "-Djava.security.auth.login.config=./jaas.conf 
> -Dlog4j.configuration=log4j-spark.properties" \
>    --conf 
> "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf
>  -Dlog4j.configuration=log4j-spark.properties" \
>    --files "${JAAS_CONF},${KEYTAB}" \
>    --class "${MAIN_CLASS}" \
>    "${ARTIFACT_FILE}"
>
>
> The first batch is huge, even if it worked for the first batch I would've
> tried researching more. The problem is that the first batch is more than
> 500k records.
>
> Thanks & Regards
> Biplob Biswas
>
>
> On Thu, Jul 26, 2018 at 2:33 PM Todd Nist <tsind...@gmail.com> wrote:
>
>> Hi Biplob,
>>
>> How many partitions are on the topic you are reading from and have you
>> set the maxRatePerPartition?  iirc, spark back pressure is calculated as
>> follows:
>>
>> *Spark back pressure:*
>>
>> Back pressure is calculated off of the following:
>>
>>
>> • maxRatePerPartition=200
>>
>> • batchInterval 30s
>>
>> • 3 partitions on Ingest topic
>>
>>
>> This results in a maximum ingest rate of 18K:
>>
>>
>> • 3 * 30 * 200 = 180000 max
>>
>> The spark.streaming.backpressure.initialRate only applies to the first
>> batch, per docs:
>>
>>
>> This is the initial maximum receiving rate at which each receiver will
>>> receive data for the *first batch* when the backpressure mechanism is
>>> enabled.
>>
>>
>> If you  set the maxRatePerPartition and apply the above formula, I
>> believe you will be able to achieve the results you are looking for.
>>
>> HTH.
>>
>> -Todd
>>
>>
>> On Thu, Jul 26, 2018 at 7:21 AM Biplob Biswas <revolutioni...@gmail.com>
>> wrote:
>>
>>> Did anyone face similar issue? and any viable way to solve this?
>>> Thanks & Regards
>>> Biplob Biswas
>>>
>>>
>>> On Wed, Jul 25, 2018 at 4:23 PM Biplob Biswas <revolutioni...@gmail.com>
>>> wrote:
>>>
>>>> I have enabled the spark.streaming.backpressure.enabled setting and
>>>> also set spark.streaming.backpressure.initialRate  to 15000, but my
>>>> spark job is not respecting these settings when reading from Kafka after a
>>>> failure.
>>>>
>>>> In my kafka topic around 500k records are waiting for being processed
>>>> and they are all taken in 1 huge batch which ultimately takes a long time
>>>> and fails with executor failure exception. We don't have more resources to
>>>> give in our test cluster and we expect the backpressure to kick in and take
>>>> smaller batches.
>>>>
>>>> What can I be doing wrong?
>>>>
>>>>
>>>> Thanks & Regards
>>>> Biplob Biswas
>>>>
>>>

Reply via email to