Implementation via Kafka Connector

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
    <version>3.1.2</version>
</dependency>

private static void publishViaKafkaConnector(Dataset<Row> dataset,
Map<String, String> conf) {
  dataset.write().format("kafka").options(conf).save();
}

conf = {
           "kafka.bootstrap.servers": "localhost:9092",
           "kafka.acks": "all",
           "topic": "test-topic",
           "kafka.linger.ms": "10000",
           "kafka.batch.size": "10000000"
       }

"kafka.linger.ms" and "kafka.batch.size" are successfully set in
Producer Config (verified via startup logs) but they are not being
honored.

========================================================================================================================

Implementation via Kafka Client

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.3</version>
</dependency>

private static void publishViaKafkaClient(Dataset<Row> dataset,
Map<String, String> conf) {
  dataset.foreachPartition(
      (ForeachPartitionFunction<Row>)
          rows -> {
            Properties props = new Properties();
            props.putAll(conf);
            KafkaProducer<String, String> producer = new KafkaProducer<>(props);
            int partitionCount = producer.partitionsFor("test-topic").size();
            System.out.println("Partition count: " + partitionCount);

            Map<Integer, Future<RecordMetadata>> lastRecordMetadata =
new HashMap<>();
            int counter = 0;
            ObjectMapper mapper = new ObjectMapper();
            while (rows.hasNext()) {
              Row row = rows.next();
              int partitionId = counter % partitionCount;
              String value = mapper.writeValueAsString(row.get(0));
              Future<RecordMetadata> future =
                  producer.send(new ProducerRecord<>("test-topic",
partitionId, null, value));
              lastRecordMetadata.put(partitionId, future);
              System.out.println("Sent message: " + null + " -> " + value);
              counter++;
            }

            lastRecordMetadata.forEach(
                (key, value) -> {
                  try {
                    RecordMetadata metadata = value.get();
                    System.out.println("Ack Received: " + metadata.toString());
                  } catch (Exception e) {
                    e.printStackTrace();
                  }
                });

            System.out.println("All messages acknowledged by Kafka Server");
            producer.close();
          });
}

conf = {
           "bootstrap.servers": "localhost:9092",
           "acks": "all",
           "linger.ms": "1000",
           "batch.size": "100000",
           "key.serializer":
"org.apache.kafka.common.serialization.ByteArraySerializer",
           "value.serializer":
"org.apache.kafka.common.serialization.ByteArraySerializer"
       }

"linger.ms" and "batch.size" are successfully set in Producer Config
(verified via startup logs) and are being honored.

========================================================================================================================


Now, the question is why "kafka.linger.ms" and "kafka.batch.size" are
not being honored by kafka connector?

Regards,
Abhishek Singla


On Wed, Apr 16, 2025 at 7:19 PM daniel williams <daniel.willi...@gmail.com>
wrote:

> If you are building a broadcast to construct a producer with a set of
> options then the producer is merely going to operate how it’s going to be
> configured - it has nothing to do with spark save that the foreachPartition
> is constructing it via the broadcast.
>
> A strategy I’ve used in the past is to
> * increase memory pool for asynchronous processing
> * make multiple broadcast producers and randomly access the producer to
> balance the asynchronous sending across more thread pools
> * implement back pressure via an adapter class to capture errors
>
> These are the same things you would want to consider while writing a high
> volume Kafka based application
>
>
>
> -dan
>
>
> On Wed, Apr 16, 2025 at 7:17 AM Abhishek Singla <
> abhisheksingla...@gmail.com> wrote:
>
>> Yes, producing via kafka-clients using foreachPartition works as
>> expected. Kafka Producer is initialised within call(Iterator<T> t)
>>  method.
>>
>> The issue is with using kafka connector with Spark batch. The configs are
>> not being honored even when they are being set in ProducerConfig. This
>> means kafka records production rate cannot be controlled via kafka
>> connector in Spark batch. This can lead to lag in in-sync replicas if they
>> are not able to catch up and eventually kafka server failing writes it
>> in-sync replicas count reduced the required in-sync replicas. Is there any
>> way to solve this using kafka connector?
>>
>> Regards,
>> Abhishek Singla
>>
>

Reply via email to