@daniel williams <daniel.willi...@gmail.com>
It's batch and not streaming. I still don't understand why "kafka.linger.ms"
and "kafka.batch.size" are not being honored by the kafka connector.

@rommelhol...@gmail.com <rommelhol...@gmail.com>
How did you fix it?

@Jungtaek Lim <kabhwan.opensou...@gmail.com>
Could you help out here in case I am missing something?

Regards,
Abhishek Singla

On Thu, Apr 17, 2025 at 6:25 AM daniel williams <daniel.willi...@gmail.com>
wrote:

> The contract between the two is a dataset, yes; but did you construct the
> former via headstream? If not, it’s still batch.
>
> -dan
>
>
> On Wed, Apr 16, 2025 at 4:54 PM Abhishek Singla <
> abhisheksingla...@gmail.com> wrote:
>
>>
>> Implementation via Kafka Connector
>>
>> <dependency>
>>     <groupId>org.apache.spark</groupId>
>>     <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
>>     <version>3.1.2</version>
>> </dependency>
>>
>> private static void publishViaKafkaConnector(Dataset<Row> dataset, 
>> Map<String, String> conf) {
>>   dataset.write().format("kafka").options(conf).save();
>> }
>>
>> conf = {
>>            "kafka.bootstrap.servers": "localhost:9092",
>>            "kafka.acks": "all",
>>            "topic": "test-topic",
>>            "kafka.linger.ms": "10000",
>>            "kafka.batch.size": "10000000"
>>        }
>>
>> "kafka.linger.ms" and "kafka.batch.size" are successfully set in Producer 
>> Config (verified via startup logs) but they are not being honored.
>>
>> ========================================================================================================================
>>
>> Implementation via Kafka Client
>>
>> <dependency>
>>     <groupId>org.apache.kafka</groupId>
>>     <artifactId>kafka-clients</artifactId>
>>     <version>2.6.3</version>
>> </dependency>
>>
>> private static void publishViaKafkaClient(Dataset<Row> dataset, Map<String, 
>> String> conf) {
>>   dataset.foreachPartition(
>>       (ForeachPartitionFunction<Row>)
>>           rows -> {
>>             Properties props = new Properties();
>>             props.putAll(conf);
>>             KafkaProducer<String, String> producer = new 
>> KafkaProducer<>(props);
>>             int partitionCount = producer.partitionsFor("test-topic").size();
>>             System.out.println("Partition count: " + partitionCount);
>>
>>             Map<Integer, Future<RecordMetadata>> lastRecordMetadata = new 
>> HashMap<>();
>>             int counter = 0;
>>             ObjectMapper mapper = new ObjectMapper();
>>             while (rows.hasNext()) {
>>               Row row = rows.next();
>>               int partitionId = counter % partitionCount;
>>               String value = mapper.writeValueAsString(row.get(0));
>>               Future<RecordMetadata> future =
>>                   producer.send(new ProducerRecord<>("test-topic", 
>> partitionId, null, value));
>>               lastRecordMetadata.put(partitionId, future);
>>               System.out.println("Sent message: " + null + " -> " + value);
>>               counter++;
>>             }
>>
>>             lastRecordMetadata.forEach(
>>                 (key, value) -> {
>>                   try {
>>                     RecordMetadata metadata = value.get();
>>                     System.out.println("Ack Received: " + 
>> metadata.toString());
>>                   } catch (Exception e) {
>>                     e.printStackTrace();
>>                   }
>>                 });
>>
>>             System.out.println("All messages acknowledged by Kafka Server");
>>             producer.close();
>>           });
>> }
>>
>> conf = {
>>            "bootstrap.servers": "localhost:9092",
>>            "acks": "all",
>>            "linger.ms": "1000",
>>            "batch.size": "100000",
>>            "key.serializer": 
>> "org.apache.kafka.common.serialization.ByteArraySerializer",
>>            "value.serializer": 
>> "org.apache.kafka.common.serialization.ByteArraySerializer"
>>        }
>>
>> "linger.ms" and "batch.size" are successfully set in Producer Config 
>> (verified via startup logs) and are being honored.
>>
>> ========================================================================================================================
>>
>>
>> Now, the question is why "kafka.linger.ms" and "kafka.batch.size" are not 
>> being honored by kafka connector?
>>
>> Regards,
>> Abhishek Singla
>>
>>
>> On Wed, Apr 16, 2025 at 7:19 PM daniel williams <
>> daniel.willi...@gmail.com> wrote:
>>
>>> If you are building a broadcast to construct a producer with a set of
>>> options then the producer is merely going to operate how it’s going to be
>>> configured - it has nothing to do with spark save that the foreachPartition
>>> is constructing it via the broadcast.
>>>
>>> A strategy I’ve used in the past is to
>>> * increase memory pool for asynchronous processing
>>> * make multiple broadcast producers and randomly access the producer to
>>> balance the asynchronous sending across more thread pools
>>> * implement back pressure via an adapter class to capture errors
>>>
>>> These are the same things you would want to consider while writing a
>>> high volume Kafka based application
>>>
>>>
>>>
>>> -dan
>>>
>>>
>>> On Wed, Apr 16, 2025 at 7:17 AM Abhishek Singla <
>>> abhisheksingla...@gmail.com> wrote:
>>>
>>>> Yes, producing via kafka-clients using foreachPartition works as
>>>> expected. Kafka Producer is initialised within call(Iterator<T> t)
>>>>  method.
>>>>
>>>> The issue is with using kafka connector with Spark batch. The configs
>>>> are not being honored even when they are being set in ProducerConfig. This
>>>> means kafka records production rate cannot be controlled via kafka
>>>> connector in Spark batch. This can lead to lag in in-sync replicas if they
>>>> are not able to catch up and eventually kafka server failing writes it
>>>> in-sync replicas count reduced the required in-sync replicas. Is there any
>>>> way to solve this using kafka connector?
>>>>
>>>> Regards,
>>>> Abhishek Singla
>>>>
>>>

Reply via email to