Implementation via Kafka Connector <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId> <version>3.1.2</version> </dependency>
private static void publishViaKafkaConnector(Dataset<Row> dataset, Map<String, String> conf) { dataset.write().format("kafka").options(conf).save(); } conf = { "kafka.bootstrap.servers": "localhost:9092", "kafka.acks": "all", "topic": "test-topic", "kafka.linger.ms": "10000", "kafka.batch.size": "10000000" } "kafka.linger.ms" and "kafka.batch.size" are successfully set in Producer Config (verified via startup logs) but they are not being honored. ======================================================================================================================== Implementation via Kafka Client <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.6.3</version> </dependency> private static void publishViaKafkaClient(Dataset<Row> dataset, Map<String, String> conf) { dataset.foreachPartition( (ForeachPartitionFunction<Row>) rows -> { Properties props = new Properties(); props.putAll(conf); KafkaProducer<String, String> producer = new KafkaProducer<>(props); int partitionCount = producer.partitionsFor("test-topic").size(); System.out.println("Partition count: " + partitionCount); Map<Integer, Future<RecordMetadata>> lastRecordMetadata = new HashMap<>(); int counter = 0; ObjectMapper mapper = new ObjectMapper(); while (rows.hasNext()) { Row row = rows.next(); int partitionId = counter % partitionCount; String value = mapper.writeValueAsString(row.get(0)); Future<RecordMetadata> future = producer.send(new ProducerRecord<>("test-topic", partitionId, null, value)); lastRecordMetadata.put(partitionId, future); System.out.println("Sent message: " + null + " -> " + value); counter++; } lastRecordMetadata.forEach( (key, value) -> { try { RecordMetadata metadata = value.get(); System.out.println("Ack Received: " + metadata.toString()); } catch (Exception e) { e.printStackTrace(); } }); System.out.println("All messages acknowledged by Kafka Server"); producer.close(); }); } conf = { "bootstrap.servers": "localhost:9092", "acks": "all", "linger.ms": "1000", "batch.size": "100000", "key.serializer": "org.apache.kafka.common.serialization.ByteArraySerializer", "value.serializer": "org.apache.kafka.common.serialization.ByteArraySerializer" } "linger.ms" and "batch.size" are successfully set in Producer Config (verified via startup logs) and are being honored. ======================================================================================================================== Now, the question is why "kafka.linger.ms" and "kafka.batch.size" are not being honored by kafka connector? Regards, Abhishek Singla On Wed, Apr 16, 2025 at 7:19 PM daniel williams <daniel.willi...@gmail.com> wrote: > If you are building a broadcast to construct a producer with a set of > options then the producer is merely going to operate how it’s going to be > configured - it has nothing to do with spark save that the foreachPartition > is constructing it via the broadcast. > > A strategy I’ve used in the past is to > * increase memory pool for asynchronous processing > * make multiple broadcast producers and randomly access the producer to > balance the asynchronous sending across more thread pools > * implement back pressure via an adapter class to capture errors > > These are the same things you would want to consider while writing a high > volume Kafka based application > > > > -dan > > > On Wed, Apr 16, 2025 at 7:17 AM Abhishek Singla < > abhisheksingla...@gmail.com> wrote: > >> Yes, producing via kafka-clients using foreachPartition works as >> expected. Kafka Producer is initialised within call(Iterator<T> t) >> method. >> >> The issue is with using kafka connector with Spark batch. The configs are >> not being honored even when they are being set in ProducerConfig. This >> means kafka records production rate cannot be controlled via kafka >> connector in Spark batch. This can lead to lag in in-sync replicas if they >> are not able to catch up and eventually kafka server failing writes it >> in-sync replicas count reduced the required in-sync replicas. Is there any >> way to solve this using kafka connector? >> >> Regards, >> Abhishek Singla >> >