If you’re using in batch mode with foreachPartition you’ll want to pass in
your options as a function of the configuration of your app, not spark
(i.e. use scopt to pass in a map which then gets passed to your
builder/broadxast). If you are using spark structured streaming you can use
Kafka.* properties assuming that your serialization concerns have been
handled upstream of your write action.

-dan


On Wed, Apr 16, 2025 at 6:27 AM Abhishek Singla <abhisheksingla...@gmail.com>
wrote:

> Sharing Producer Config on Spark Startup
>
> 25/04/16 15:11:59 INFO ProducerConfig: ProducerConfig values:
> acks = -1
> batch.size = 10000000
> bootstrap.servers = [localhost:9092]
> buffer.memory = 33554432
> client.dns.lookup = use_all_dns_ips
> client.id = producer-1
> compression.type = none
> connections.max.idle.ms = 540000
> delivery.timeout.ms = 120000
> enable.idempotence = false
> interceptor.classes = []
> internal.auto.downgrade.txn.commit = false
> key.serializer = class
> org.apache.kafka.common.serialization.ByteArraySerializer
> linger.ms = 10000
> max.block.ms = 60000
> max.in.flight.requests.per.connection = 5
> max.request.size = 1048576
> metadata.max.age.ms = 300000
> metadata.max.idle.ms = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 30000
> partitioner.class = class
> org.apache.kafka.clients.producer.internals.DefaultPartitioner
> receive.buffer.bytes = 32768
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> request.timeout.ms = 30000
> retries = 2147483647
> retry.backoff.ms = 100
> sasl.client.callback.handler.class = null
> sasl.jaas.config = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.login.callback.handler.class = null
> sasl.login.class = null
> sasl.login.refresh.buffer.seconds = 300
> sasl.login.refresh.min.period.seconds = 60
> sasl.login.refresh.window.factor = 0.8
> sasl.login.refresh.window.jitter = 0.05
> sasl.mechanism = GSSAPI
> security.protocol = PLAINTEXT
> security.providers = null
> send.buffer.bytes = 131072
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
> ssl.endpoint.identification.algorithm = https
> ssl.engine.factory.class = null
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLSv1.3
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> transaction.timeout.ms = 60000
> transactional.id = null
> value.serializer = class
> org.apache.kafka.common.serialization.ByteArraySerializer
>
> On Wed, Apr 16, 2025 at 5:55 PM Abhishek Singla <
> abhisheksingla...@gmail.com> wrote:
>
>> Hi Daniel and Jungtaek,
>>
>> I am using Spark in batch. Tried with kafka.<option>, now I can see they
>> are being set in Producer Config on Spark Startup but still they are not
>> being honored. I have set "linger.ms": "1000" and "batch.size": "100000". I
>> am publishing 10 records and they are flushed to kafka server immediately,
>> however kafka producer behaviour when publishing via kafka-clients using
>> foreachPartition is as expected. Am I missing something here or is
>> throttling not supported in the kafka connector?
>>
>> Regards,
>> Abhishek Singla
>>
>> On Thu, Mar 27, 2025 at 4:56 AM daniel williams <
>> daniel.willi...@gmail.com> wrote:
>>
>>> If you're using structured streaming you can pass in options as
>>> kafka.<option> into options as documented. If you're using Spark in batch
>>> form you'll want to do a foreach on a KafkaProducer via a Broadcast.
>>>
>>> All KafkaProducer specific options
>>> <https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html>
>>>  will
>>> need to be prepended by *kafka.*
>>>
>>>
>>> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
>>>
>>>
>>> On Wed, Mar 26, 2025 at 4:11 PM Jungtaek Lim <
>>> kabhwan.opensou...@gmail.com> wrote:
>>>
>>>> Sorry I missed this. Did you make sure that you add "kafka." as prefix
>>>> on kafka side config when specifying Kafka source/sink option?
>>>>
>>>> On Mon, Feb 24, 2025 at 10:31 PM Abhishek Singla <
>>>> abhisheksingla...@gmail.com> wrote:
>>>>
>>>>> Hi Team,
>>>>>
>>>>> I am using spark to read from S3 and write to Kafka.
>>>>>
>>>>> Spark Version: 3.1.2
>>>>> Scala Version: 2.12
>>>>> Spark Kafka connector: spark-sql-kafka-0-10_2.12
>>>>>
>>>>> I want to throttle kafka producer. I tried using *linger.ms
>>>>> <http://linger.ms>* and *batch.size* config but I can see in 
>>>>> *ProducerConfig:
>>>>> ProducerConfig values* at runtime that they are not being set. Is
>>>>> there something I am missing? Is there any other way to throttle kafka
>>>>> writes?
>>>>>
>>>>> *dataset.write().format("kafka").options(options).save();*
>>>>>
>>>>> Regards,
>>>>> Abhishek Singla
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>
>>> --
>>> -dan
>>>
>>

Reply via email to