The contract between the two is a dataset, yes; but did you construct the
former via headstream? If not, it’s still batch.

-dan


On Wed, Apr 16, 2025 at 4:54 PM Abhishek Singla <abhisheksingla...@gmail.com>
wrote:

>
> Implementation via Kafka Connector
>
> <dependency>
>     <groupId>org.apache.spark</groupId>
>     <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
>     <version>3.1.2</version>
> </dependency>
>
> private static void publishViaKafkaConnector(Dataset<Row> dataset, 
> Map<String, String> conf) {
>   dataset.write().format("kafka").options(conf).save();
> }
>
> conf = {
>            "kafka.bootstrap.servers": "localhost:9092",
>            "kafka.acks": "all",
>            "topic": "test-topic",
>            "kafka.linger.ms": "10000",
>            "kafka.batch.size": "10000000"
>        }
>
> "kafka.linger.ms" and "kafka.batch.size" are successfully set in Producer 
> Config (verified via startup logs) but they are not being honored.
>
> ========================================================================================================================
>
> Implementation via Kafka Client
>
> <dependency>
>     <groupId>org.apache.kafka</groupId>
>     <artifactId>kafka-clients</artifactId>
>     <version>2.6.3</version>
> </dependency>
>
> private static void publishViaKafkaClient(Dataset<Row> dataset, Map<String, 
> String> conf) {
>   dataset.foreachPartition(
>       (ForeachPartitionFunction<Row>)
>           rows -> {
>             Properties props = new Properties();
>             props.putAll(conf);
>             KafkaProducer<String, String> producer = new 
> KafkaProducer<>(props);
>             int partitionCount = producer.partitionsFor("test-topic").size();
>             System.out.println("Partition count: " + partitionCount);
>
>             Map<Integer, Future<RecordMetadata>> lastRecordMetadata = new 
> HashMap<>();
>             int counter = 0;
>             ObjectMapper mapper = new ObjectMapper();
>             while (rows.hasNext()) {
>               Row row = rows.next();
>               int partitionId = counter % partitionCount;
>               String value = mapper.writeValueAsString(row.get(0));
>               Future<RecordMetadata> future =
>                   producer.send(new ProducerRecord<>("test-topic", 
> partitionId, null, value));
>               lastRecordMetadata.put(partitionId, future);
>               System.out.println("Sent message: " + null + " -> " + value);
>               counter++;
>             }
>
>             lastRecordMetadata.forEach(
>                 (key, value) -> {
>                   try {
>                     RecordMetadata metadata = value.get();
>                     System.out.println("Ack Received: " + 
> metadata.toString());
>                   } catch (Exception e) {
>                     e.printStackTrace();
>                   }
>                 });
>
>             System.out.println("All messages acknowledged by Kafka Server");
>             producer.close();
>           });
> }
>
> conf = {
>            "bootstrap.servers": "localhost:9092",
>            "acks": "all",
>            "linger.ms": "1000",
>            "batch.size": "100000",
>            "key.serializer": 
> "org.apache.kafka.common.serialization.ByteArraySerializer",
>            "value.serializer": 
> "org.apache.kafka.common.serialization.ByteArraySerializer"
>        }
>
> "linger.ms" and "batch.size" are successfully set in Producer Config 
> (verified via startup logs) and are being honored.
>
> ========================================================================================================================
>
>
> Now, the question is why "kafka.linger.ms" and "kafka.batch.size" are not 
> being honored by kafka connector?
>
> Regards,
> Abhishek Singla
>
>
> On Wed, Apr 16, 2025 at 7:19 PM daniel williams <daniel.willi...@gmail.com>
> wrote:
>
>> If you are building a broadcast to construct a producer with a set of
>> options then the producer is merely going to operate how it’s going to be
>> configured - it has nothing to do with spark save that the foreachPartition
>> is constructing it via the broadcast.
>>
>> A strategy I’ve used in the past is to
>> * increase memory pool for asynchronous processing
>> * make multiple broadcast producers and randomly access the producer to
>> balance the asynchronous sending across more thread pools
>> * implement back pressure via an adapter class to capture errors
>>
>> These are the same things you would want to consider while writing a high
>> volume Kafka based application
>>
>>
>>
>> -dan
>>
>>
>> On Wed, Apr 16, 2025 at 7:17 AM Abhishek Singla <
>> abhisheksingla...@gmail.com> wrote:
>>
>>> Yes, producing via kafka-clients using foreachPartition works as
>>> expected. Kafka Producer is initialised within call(Iterator<T> t)
>>>  method.
>>>
>>> The issue is with using kafka connector with Spark batch. The configs
>>> are not being honored even when they are being set in ProducerConfig. This
>>> means kafka records production rate cannot be controlled via kafka
>>> connector in Spark batch. This can lead to lag in in-sync replicas if they
>>> are not able to catch up and eventually kafka server failing writes it
>>> in-sync replicas count reduced the required in-sync replicas. Is there any
>>> way to solve this using kafka connector?
>>>
>>> Regards,
>>> Abhishek Singla
>>>
>>

Reply via email to