Re: Kafka Connector: producer throttling

2025-04-17 Thread daniel williams
@Abhishek Singla kafka.* options are for streaming applications, not batch. This is clearly documented on the structured streaming pages with the kafka integration. If you are transmitting data in a batch application it's best to do so via a foreachPartition operation as discussed leveraging broad

Re: Kafka Connector: producer throttling

2025-04-17 Thread Rommel Yuan
I noticed this problem like a year ago, I just didn't pursue further due to other issues. The solution is to use broadcast Kafka config and make producer in each partition. But if naturally the config is honored, that would be great. On Thu, Apr 17, 2025, 15:27 Abhishek Singla wrote: > @daniel

Re: Kafka Connector: producer throttling

2025-04-17 Thread Abhishek Singla
@daniel williams It's batch and not streaming. I still don't understand why "kafka.linger.ms" and "kafka.batch.size" are not being honored by the kafka connector. @rommelhol...@gmail.com How did you fix it? @Jungtaek Lim Could you help out here in case I am missing something? Regards, Abhishe

Re: Kafka Connector: producer throttling

2025-04-16 Thread daniel williams
The contract between the two is a dataset, yes; but did you construct the former via headstream? If not, it’s still batch. -dan On Wed, Apr 16, 2025 at 4:54 PM Abhishek Singla wrote: > > Implementation via Kafka Connector > > > org.apache.spark > spark-sql-kafka-0-10_${scala.version}

Re: Kafka Connector: producer throttling

2025-04-16 Thread Abhishek Singla
Implementation via Kafka Connector org.apache.spark spark-sql-kafka-0-10_${scala.version} 3.1.2 private static void publishViaKafkaConnector(Dataset dataset, Map conf) { dataset.write().format("kafka").options(conf).save(); } conf = { "kafka.bootstrap.servers": "localh

Re: Kafka Connector: producer throttling

2025-04-16 Thread daniel williams
If you are building a broadcast to construct a producer with a set of options then the producer is merely going to operate how it’s going to be configured - it has nothing to do with spark save that the foreachPartition is constructing it via the broadcast. A strategy I’ve used in the past is to *

Re: Kafka Connector: producer throttling

2025-04-16 Thread Abhishek Singla
Yes, producing via kafka-clients using foreachPartition works as expected. Kafka Producer is initialised within call(Iterator t) method. The issue is with using kafka connector with Spark batch. The configs are not being honored even when they are being set in ProducerConfig. This means kafka reco

Re: Kafka Connector: producer throttling

2025-04-16 Thread daniel williams
If you’re using in batch mode with foreachPartition you’ll want to pass in your options as a function of the configuration of your app, not spark (i.e. use scopt to pass in a map which then gets passed to your builder/broadxast). If you are using spark structured streaming you can use Kafka.* prope

Re: Kafka Connector: producer throttling

2025-04-16 Thread Abhishek Singla
Sharing Producer Config on Spark Startup 25/04/16 15:11:59 INFO ProducerConfig: ProducerConfig values: acks = -1 batch.size = 1000 bootstrap.servers = [localhost:9092] buffer.memory = 33554432 client.dns.lookup = use_all_dns_ips client.id = producer-1 compression.type = none connections.max.id

Re: Kafka Connector: producer throttling

2025-04-16 Thread Abhishek Singla
Hi Daniel and Jungtaek, I am using Spark in batch. Tried with kafka., now I can see they are being set in Producer Config on Spark Startup but still they are not being honored. I have set "linger.ms": "1000" and "batch.size": "10". I am publishing 10 records and they are flushed to kafka serve

Re: Kafka Connector: producer throttling

2025-03-26 Thread daniel williams
If you're using structured streaming you can pass in options as kafka. into options as documented. If you're using Spark in batch form you'll want to do a foreach on a KafkaProducer via a Broadcast. All KafkaProducer specific options

Re: Kafka Connector: producer throttling

2025-03-26 Thread Jungtaek Lim
Sorry I missed this. Did you make sure that you add "kafka." as prefix on kafka side config when specifying Kafka source/sink option? On Mon, Feb 24, 2025 at 10:31 PM Abhishek Singla < abhisheksingla...@gmail.com> wrote: > Hi Team, > > I am using spark to read from S3 and write to Kafka. > > Spar

Re: Kafka Connector: producer throttling

2025-03-26 Thread Rommel Yuan
It's been a month. I guess the answer is no. I have been running into the same issue. I guess building a Kafka client is the only option. Rommel On Mon, Feb 24, 2025, 09:20 Abhishek Singla wrote: > Isn't there a way to do it with kafka connector instead of kafka client? > Isn't there any way to

Re: Kafka Connector: producer throttling

2025-02-24 Thread Abhishek Singla
Isn't there a way to do it with kafka connector instead of kafka client? Isn't there any way to throttle kafka connector? Seems like a common problem. Regards, Abhishek Singla On Mon, Feb 24, 2025 at 7:24 PM daniel williams wrote: > I think you should be using a foreachPartition and a broadcast

Re: Kafka Connector: producer throttling

2025-02-24 Thread daniel williams
I think you should be using a foreachPartition and a broadcast to build your producer. From there you will have full control of all options and serialization needed via direct access to the KafkaProducer, as well as all options therein associated (e.g. callbacks, interceptors, etc). -dan On Mon,