I noticed this problem like a year ago, I just didn't pursue further due to other issues. The solution is to use broadcast Kafka config and make producer in each partition.
But if naturally the config is honored, that would be great. On Thu, Apr 17, 2025, 15:27 Abhishek Singla <abhisheksingla...@gmail.com> wrote: > @daniel williams <daniel.willi...@gmail.com> > It's batch and not streaming. I still don't understand why " > kafka.linger.ms" and "kafka.batch.size" are not being honored by the > kafka connector. > > @rommelhol...@gmail.com <rommelhol...@gmail.com> > How did you fix it? > > @Jungtaek Lim <kabhwan.opensou...@gmail.com> > Could you help out here in case I am missing something? > > Regards, > Abhishek Singla > > On Thu, Apr 17, 2025 at 6:25 AM daniel williams <daniel.willi...@gmail.com> > wrote: > >> The contract between the two is a dataset, yes; but did you construct the >> former via headstream? If not, it’s still batch. >> >> -dan >> >> >> On Wed, Apr 16, 2025 at 4:54 PM Abhishek Singla < >> abhisheksingla...@gmail.com> wrote: >> >>> >>> Implementation via Kafka Connector >>> >>> <dependency> >>> <groupId>org.apache.spark</groupId> >>> <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId> >>> <version>3.1.2</version> >>> </dependency> >>> >>> private static void publishViaKafkaConnector(Dataset<Row> dataset, >>> Map<String, String> conf) { >>> dataset.write().format("kafka").options(conf).save(); >>> } >>> >>> conf = { >>> "kafka.bootstrap.servers": "localhost:9092", >>> "kafka.acks": "all", >>> "topic": "test-topic", >>> "kafka.linger.ms": "10000", >>> "kafka.batch.size": "10000000" >>> } >>> >>> "kafka.linger.ms" and "kafka.batch.size" are successfully set in Producer >>> Config (verified via startup logs) but they are not being honored. >>> >>> ======================================================================================================================== >>> >>> Implementation via Kafka Client >>> >>> <dependency> >>> <groupId>org.apache.kafka</groupId> >>> <artifactId>kafka-clients</artifactId> >>> <version>2.6.3</version> >>> </dependency> >>> >>> private static void publishViaKafkaClient(Dataset<Row> dataset, Map<String, >>> String> conf) { >>> dataset.foreachPartition( >>> (ForeachPartitionFunction<Row>) >>> rows -> { >>> Properties props = new Properties(); >>> props.putAll(conf); >>> KafkaProducer<String, String> producer = new >>> KafkaProducer<>(props); >>> int partitionCount = >>> producer.partitionsFor("test-topic").size(); >>> System.out.println("Partition count: " + partitionCount); >>> >>> Map<Integer, Future<RecordMetadata>> lastRecordMetadata = new >>> HashMap<>(); >>> int counter = 0; >>> ObjectMapper mapper = new ObjectMapper(); >>> while (rows.hasNext()) { >>> Row row = rows.next(); >>> int partitionId = counter % partitionCount; >>> String value = mapper.writeValueAsString(row.get(0)); >>> Future<RecordMetadata> future = >>> producer.send(new ProducerRecord<>("test-topic", >>> partitionId, null, value)); >>> lastRecordMetadata.put(partitionId, future); >>> System.out.println("Sent message: " + null + " -> " + value); >>> counter++; >>> } >>> >>> lastRecordMetadata.forEach( >>> (key, value) -> { >>> try { >>> RecordMetadata metadata = value.get(); >>> System.out.println("Ack Received: " + >>> metadata.toString()); >>> } catch (Exception e) { >>> e.printStackTrace(); >>> } >>> }); >>> >>> System.out.println("All messages acknowledged by Kafka Server"); >>> producer.close(); >>> }); >>> } >>> >>> conf = { >>> "bootstrap.servers": "localhost:9092", >>> "acks": "all", >>> "linger.ms": "1000", >>> "batch.size": "100000", >>> "key.serializer": >>> "org.apache.kafka.common.serialization.ByteArraySerializer", >>> "value.serializer": >>> "org.apache.kafka.common.serialization.ByteArraySerializer" >>> } >>> >>> "linger.ms" and "batch.size" are successfully set in Producer Config >>> (verified via startup logs) and are being honored. >>> >>> ======================================================================================================================== >>> >>> >>> Now, the question is why "kafka.linger.ms" and "kafka.batch.size" are not >>> being honored by kafka connector? >>> >>> Regards, >>> Abhishek Singla >>> >>> >>> On Wed, Apr 16, 2025 at 7:19 PM daniel williams < >>> daniel.willi...@gmail.com> wrote: >>> >>>> If you are building a broadcast to construct a producer with a set of >>>> options then the producer is merely going to operate how it’s going to be >>>> configured - it has nothing to do with spark save that the foreachPartition >>>> is constructing it via the broadcast. >>>> >>>> A strategy I’ve used in the past is to >>>> * increase memory pool for asynchronous processing >>>> * make multiple broadcast producers and randomly access the producer to >>>> balance the asynchronous sending across more thread pools >>>> * implement back pressure via an adapter class to capture errors >>>> >>>> These are the same things you would want to consider while writing a >>>> high volume Kafka based application >>>> >>>> >>>> >>>> -dan >>>> >>>> >>>> On Wed, Apr 16, 2025 at 7:17 AM Abhishek Singla < >>>> abhisheksingla...@gmail.com> wrote: >>>> >>>>> Yes, producing via kafka-clients using foreachPartition works as >>>>> expected. Kafka Producer is initialised within call(Iterator<T> t) >>>>> method. >>>>> >>>>> The issue is with using kafka connector with Spark batch. The configs >>>>> are not being honored even when they are being set in ProducerConfig. This >>>>> means kafka records production rate cannot be controlled via kafka >>>>> connector in Spark batch. This can lead to lag in in-sync replicas if they >>>>> are not able to catch up and eventually kafka server failing writes it >>>>> in-sync replicas count reduced the required in-sync replicas. Is there any >>>>> way to solve this using kafka connector? >>>>> >>>>> Regards, >>>>> Abhishek Singla >>>>> >>>>