@Abhishek Singla <abhisheksingla...@gmail.com> kafka.* options are for
streaming applications, not batch. This is clearly documented on the
structured streaming pages with the kafka integration. If you are
transmitting data in a batch application it's best to do so via a
foreachPartition operation as discussed leveraging broadcast variables to
control your KafkaProducers.

On Thu, Apr 17, 2025 at 4:27 PM Abhishek Singla <abhisheksingla...@gmail.com>
wrote:

> @daniel williams <daniel.willi...@gmail.com>
> It's batch and not streaming. I still don't understand why "
> kafka.linger.ms" and "kafka.batch.size" are not being honored by the
> kafka connector.
>
> @rommelhol...@gmail.com <rommelhol...@gmail.com>
> How did you fix it?
>
> @Jungtaek Lim <kabhwan.opensou...@gmail.com>
> Could you help out here in case I am missing something?
>
> Regards,
> Abhishek Singla
>
> On Thu, Apr 17, 2025 at 6:25 AM daniel williams <daniel.willi...@gmail.com>
> wrote:
>
>> The contract between the two is a dataset, yes; but did you construct the
>> former via headstream? If not, it’s still batch.
>>
>> -dan
>>
>>
>> On Wed, Apr 16, 2025 at 4:54 PM Abhishek Singla <
>> abhisheksingla...@gmail.com> wrote:
>>
>>>
>>> Implementation via Kafka Connector
>>>
>>> <dependency>
>>>     <groupId>org.apache.spark</groupId>
>>>     <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
>>>     <version>3.1.2</version>
>>> </dependency>
>>>
>>> private static void publishViaKafkaConnector(Dataset<Row> dataset, 
>>> Map<String, String> conf) {
>>>   dataset.write().format("kafka").options(conf).save();
>>> }
>>>
>>> conf = {
>>>            "kafka.bootstrap.servers": "localhost:9092",
>>>            "kafka.acks": "all",
>>>            "topic": "test-topic",
>>>            "kafka.linger.ms": "10000",
>>>            "kafka.batch.size": "10000000"
>>>        }
>>>
>>> "kafka.linger.ms" and "kafka.batch.size" are successfully set in Producer 
>>> Config (verified via startup logs) but they are not being honored.
>>>
>>> ========================================================================================================================
>>>
>>> Implementation via Kafka Client
>>>
>>> <dependency>
>>>     <groupId>org.apache.kafka</groupId>
>>>     <artifactId>kafka-clients</artifactId>
>>>     <version>2.6.3</version>
>>> </dependency>
>>>
>>> private static void publishViaKafkaClient(Dataset<Row> dataset, Map<String, 
>>> String> conf) {
>>>   dataset.foreachPartition(
>>>       (ForeachPartitionFunction<Row>)
>>>           rows -> {
>>>             Properties props = new Properties();
>>>             props.putAll(conf);
>>>             KafkaProducer<String, String> producer = new 
>>> KafkaProducer<>(props);
>>>             int partitionCount = 
>>> producer.partitionsFor("test-topic").size();
>>>             System.out.println("Partition count: " + partitionCount);
>>>
>>>             Map<Integer, Future<RecordMetadata>> lastRecordMetadata = new 
>>> HashMap<>();
>>>             int counter = 0;
>>>             ObjectMapper mapper = new ObjectMapper();
>>>             while (rows.hasNext()) {
>>>               Row row = rows.next();
>>>               int partitionId = counter % partitionCount;
>>>               String value = mapper.writeValueAsString(row.get(0));
>>>               Future<RecordMetadata> future =
>>>                   producer.send(new ProducerRecord<>("test-topic", 
>>> partitionId, null, value));
>>>               lastRecordMetadata.put(partitionId, future);
>>>               System.out.println("Sent message: " + null + " -> " + value);
>>>               counter++;
>>>             }
>>>
>>>             lastRecordMetadata.forEach(
>>>                 (key, value) -> {
>>>                   try {
>>>                     RecordMetadata metadata = value.get();
>>>                     System.out.println("Ack Received: " + 
>>> metadata.toString());
>>>                   } catch (Exception e) {
>>>                     e.printStackTrace();
>>>                   }
>>>                 });
>>>
>>>             System.out.println("All messages acknowledged by Kafka Server");
>>>             producer.close();
>>>           });
>>> }
>>>
>>> conf = {
>>>            "bootstrap.servers": "localhost:9092",
>>>            "acks": "all",
>>>            "linger.ms": "1000",
>>>            "batch.size": "100000",
>>>            "key.serializer": 
>>> "org.apache.kafka.common.serialization.ByteArraySerializer",
>>>            "value.serializer": 
>>> "org.apache.kafka.common.serialization.ByteArraySerializer"
>>>        }
>>>
>>> "linger.ms" and "batch.size" are successfully set in Producer Config 
>>> (verified via startup logs) and are being honored.
>>>
>>> ========================================================================================================================
>>>
>>>
>>> Now, the question is why "kafka.linger.ms" and "kafka.batch.size" are not 
>>> being honored by kafka connector?
>>>
>>> Regards,
>>> Abhishek Singla
>>>
>>>
>>> On Wed, Apr 16, 2025 at 7:19 PM daniel williams <
>>> daniel.willi...@gmail.com> wrote:
>>>
>>>> If you are building a broadcast to construct a producer with a set of
>>>> options then the producer is merely going to operate how it’s going to be
>>>> configured - it has nothing to do with spark save that the foreachPartition
>>>> is constructing it via the broadcast.
>>>>
>>>> A strategy I’ve used in the past is to
>>>> * increase memory pool for asynchronous processing
>>>> * make multiple broadcast producers and randomly access the producer to
>>>> balance the asynchronous sending across more thread pools
>>>> * implement back pressure via an adapter class to capture errors
>>>>
>>>> These are the same things you would want to consider while writing a
>>>> high volume Kafka based application
>>>>
>>>>
>>>>
>>>> -dan
>>>>
>>>>
>>>> On Wed, Apr 16, 2025 at 7:17 AM Abhishek Singla <
>>>> abhisheksingla...@gmail.com> wrote:
>>>>
>>>>> Yes, producing via kafka-clients using foreachPartition works as
>>>>> expected. Kafka Producer is initialised within call(Iterator<T> t)
>>>>>  method.
>>>>>
>>>>> The issue is with using kafka connector with Spark batch. The configs
>>>>> are not being honored even when they are being set in ProducerConfig. This
>>>>> means kafka records production rate cannot be controlled via kafka
>>>>> connector in Spark batch. This can lead to lag in in-sync replicas if they
>>>>> are not able to catch up and eventually kafka server failing writes it
>>>>> in-sync replicas count reduced the required in-sync replicas. Is there any
>>>>> way to solve this using kafka connector?
>>>>>
>>>>> Regards,
>>>>> Abhishek Singla
>>>>>
>>>>

-- 
-dan

Reply via email to