@Abhishek Singla <abhisheksingla...@gmail.com> kafka.* options are for streaming applications, not batch. This is clearly documented on the structured streaming pages with the kafka integration. If you are transmitting data in a batch application it's best to do so via a foreachPartition operation as discussed leveraging broadcast variables to control your KafkaProducers.
On Thu, Apr 17, 2025 at 4:27 PM Abhishek Singla <abhisheksingla...@gmail.com> wrote: > @daniel williams <daniel.willi...@gmail.com> > It's batch and not streaming. I still don't understand why " > kafka.linger.ms" and "kafka.batch.size" are not being honored by the > kafka connector. > > @rommelhol...@gmail.com <rommelhol...@gmail.com> > How did you fix it? > > @Jungtaek Lim <kabhwan.opensou...@gmail.com> > Could you help out here in case I am missing something? > > Regards, > Abhishek Singla > > On Thu, Apr 17, 2025 at 6:25 AM daniel williams <daniel.willi...@gmail.com> > wrote: > >> The contract between the two is a dataset, yes; but did you construct the >> former via headstream? If not, it’s still batch. >> >> -dan >> >> >> On Wed, Apr 16, 2025 at 4:54 PM Abhishek Singla < >> abhisheksingla...@gmail.com> wrote: >> >>> >>> Implementation via Kafka Connector >>> >>> <dependency> >>> <groupId>org.apache.spark</groupId> >>> <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId> >>> <version>3.1.2</version> >>> </dependency> >>> >>> private static void publishViaKafkaConnector(Dataset<Row> dataset, >>> Map<String, String> conf) { >>> dataset.write().format("kafka").options(conf).save(); >>> } >>> >>> conf = { >>> "kafka.bootstrap.servers": "localhost:9092", >>> "kafka.acks": "all", >>> "topic": "test-topic", >>> "kafka.linger.ms": "10000", >>> "kafka.batch.size": "10000000" >>> } >>> >>> "kafka.linger.ms" and "kafka.batch.size" are successfully set in Producer >>> Config (verified via startup logs) but they are not being honored. >>> >>> ======================================================================================================================== >>> >>> Implementation via Kafka Client >>> >>> <dependency> >>> <groupId>org.apache.kafka</groupId> >>> <artifactId>kafka-clients</artifactId> >>> <version>2.6.3</version> >>> </dependency> >>> >>> private static void publishViaKafkaClient(Dataset<Row> dataset, Map<String, >>> String> conf) { >>> dataset.foreachPartition( >>> (ForeachPartitionFunction<Row>) >>> rows -> { >>> Properties props = new Properties(); >>> props.putAll(conf); >>> KafkaProducer<String, String> producer = new >>> KafkaProducer<>(props); >>> int partitionCount = >>> producer.partitionsFor("test-topic").size(); >>> System.out.println("Partition count: " + partitionCount); >>> >>> Map<Integer, Future<RecordMetadata>> lastRecordMetadata = new >>> HashMap<>(); >>> int counter = 0; >>> ObjectMapper mapper = new ObjectMapper(); >>> while (rows.hasNext()) { >>> Row row = rows.next(); >>> int partitionId = counter % partitionCount; >>> String value = mapper.writeValueAsString(row.get(0)); >>> Future<RecordMetadata> future = >>> producer.send(new ProducerRecord<>("test-topic", >>> partitionId, null, value)); >>> lastRecordMetadata.put(partitionId, future); >>> System.out.println("Sent message: " + null + " -> " + value); >>> counter++; >>> } >>> >>> lastRecordMetadata.forEach( >>> (key, value) -> { >>> try { >>> RecordMetadata metadata = value.get(); >>> System.out.println("Ack Received: " + >>> metadata.toString()); >>> } catch (Exception e) { >>> e.printStackTrace(); >>> } >>> }); >>> >>> System.out.println("All messages acknowledged by Kafka Server"); >>> producer.close(); >>> }); >>> } >>> >>> conf = { >>> "bootstrap.servers": "localhost:9092", >>> "acks": "all", >>> "linger.ms": "1000", >>> "batch.size": "100000", >>> "key.serializer": >>> "org.apache.kafka.common.serialization.ByteArraySerializer", >>> "value.serializer": >>> "org.apache.kafka.common.serialization.ByteArraySerializer" >>> } >>> >>> "linger.ms" and "batch.size" are successfully set in Producer Config >>> (verified via startup logs) and are being honored. >>> >>> ======================================================================================================================== >>> >>> >>> Now, the question is why "kafka.linger.ms" and "kafka.batch.size" are not >>> being honored by kafka connector? >>> >>> Regards, >>> Abhishek Singla >>> >>> >>> On Wed, Apr 16, 2025 at 7:19 PM daniel williams < >>> daniel.willi...@gmail.com> wrote: >>> >>>> If you are building a broadcast to construct a producer with a set of >>>> options then the producer is merely going to operate how it’s going to be >>>> configured - it has nothing to do with spark save that the foreachPartition >>>> is constructing it via the broadcast. >>>> >>>> A strategy I’ve used in the past is to >>>> * increase memory pool for asynchronous processing >>>> * make multiple broadcast producers and randomly access the producer to >>>> balance the asynchronous sending across more thread pools >>>> * implement back pressure via an adapter class to capture errors >>>> >>>> These are the same things you would want to consider while writing a >>>> high volume Kafka based application >>>> >>>> >>>> >>>> -dan >>>> >>>> >>>> On Wed, Apr 16, 2025 at 7:17 AM Abhishek Singla < >>>> abhisheksingla...@gmail.com> wrote: >>>> >>>>> Yes, producing via kafka-clients using foreachPartition works as >>>>> expected. Kafka Producer is initialised within call(Iterator<T> t) >>>>> method. >>>>> >>>>> The issue is with using kafka connector with Spark batch. The configs >>>>> are not being honored even when they are being set in ProducerConfig. This >>>>> means kafka records production rate cannot be controlled via kafka >>>>> connector in Spark batch. This can lead to lag in in-sync replicas if they >>>>> are not able to catch up and eventually kafka server failing writes it >>>>> in-sync replicas count reduced the required in-sync replicas. Is there any >>>>> way to solve this using kafka connector? >>>>> >>>>> Regards, >>>>> Abhishek Singla >>>>> >>>> -- -dan