If you’re using in batch mode with foreachPartition you’ll want to pass in your options as a function of the configuration of your app, not spark (i.e. use scopt to pass in a map which then gets passed to your builder/broadxast). If you are using spark structured streaming you can use Kafka.* properties assuming that your serialization concerns have been handled upstream of your write action.
-dan On Wed, Apr 16, 2025 at 6:27 AM Abhishek Singla <abhisheksingla...@gmail.com> wrote: > Sharing Producer Config on Spark Startup > > 25/04/16 15:11:59 INFO ProducerConfig: ProducerConfig values: > acks = -1 > batch.size = 10000000 > bootstrap.servers = [localhost:9092] > buffer.memory = 33554432 > client.dns.lookup = use_all_dns_ips > client.id = producer-1 > compression.type = none > connections.max.idle.ms = 540000 > delivery.timeout.ms = 120000 > enable.idempotence = false > interceptor.classes = [] > internal.auto.downgrade.txn.commit = false > key.serializer = class > org.apache.kafka.common.serialization.ByteArraySerializer > linger.ms = 10000 > max.block.ms = 60000 > max.in.flight.requests.per.connection = 5 > max.request.size = 1048576 > metadata.max.age.ms = 300000 > metadata.max.idle.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 30000 > retries = 2147483647 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > security.providers = null > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.3] > ssl.endpoint.identification.algorithm = https > ssl.engine.factory.class = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLSv1.3 > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > transaction.timeout.ms = 60000 > transactional.id = null > value.serializer = class > org.apache.kafka.common.serialization.ByteArraySerializer > > On Wed, Apr 16, 2025 at 5:55 PM Abhishek Singla < > abhisheksingla...@gmail.com> wrote: > >> Hi Daniel and Jungtaek, >> >> I am using Spark in batch. Tried with kafka.<option>, now I can see they >> are being set in Producer Config on Spark Startup but still they are not >> being honored. I have set "linger.ms": "1000" and "batch.size": "100000". I >> am publishing 10 records and they are flushed to kafka server immediately, >> however kafka producer behaviour when publishing via kafka-clients using >> foreachPartition is as expected. Am I missing something here or is >> throttling not supported in the kafka connector? >> >> Regards, >> Abhishek Singla >> >> On Thu, Mar 27, 2025 at 4:56 AM daniel williams < >> daniel.willi...@gmail.com> wrote: >> >>> If you're using structured streaming you can pass in options as >>> kafka.<option> into options as documented. If you're using Spark in batch >>> form you'll want to do a foreach on a KafkaProducer via a Broadcast. >>> >>> All KafkaProducer specific options >>> <https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html> >>> will >>> need to be prepended by *kafka.* >>> >>> >>> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html >>> >>> >>> On Wed, Mar 26, 2025 at 4:11 PM Jungtaek Lim < >>> kabhwan.opensou...@gmail.com> wrote: >>> >>>> Sorry I missed this. Did you make sure that you add "kafka." as prefix >>>> on kafka side config when specifying Kafka source/sink option? >>>> >>>> On Mon, Feb 24, 2025 at 10:31 PM Abhishek Singla < >>>> abhisheksingla...@gmail.com> wrote: >>>> >>>>> Hi Team, >>>>> >>>>> I am using spark to read from S3 and write to Kafka. >>>>> >>>>> Spark Version: 3.1.2 >>>>> Scala Version: 2.12 >>>>> Spark Kafka connector: spark-sql-kafka-0-10_2.12 >>>>> >>>>> I want to throttle kafka producer. I tried using *linger.ms >>>>> <http://linger.ms>* and *batch.size* config but I can see in >>>>> *ProducerConfig: >>>>> ProducerConfig values* at runtime that they are not being set. Is >>>>> there something I am missing? Is there any other way to throttle kafka >>>>> writes? >>>>> >>>>> *dataset.write().format("kafka").options(options).save();* >>>>> >>>>> Regards, >>>>> Abhishek Singla >>>>> >>>>> >>>>> >>>>> >>>>> >>> >>> -- >>> -dan >>> >>