The contract between the two is a dataset, yes; but did you construct the former via headstream? If not, it’s still batch.
-dan On Wed, Apr 16, 2025 at 4:54 PM Abhishek Singla <abhisheksingla...@gmail.com> wrote: > > Implementation via Kafka Connector > > <dependency> > <groupId>org.apache.spark</groupId> > <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId> > <version>3.1.2</version> > </dependency> > > private static void publishViaKafkaConnector(Dataset<Row> dataset, > Map<String, String> conf) { > dataset.write().format("kafka").options(conf).save(); > } > > conf = { > "kafka.bootstrap.servers": "localhost:9092", > "kafka.acks": "all", > "topic": "test-topic", > "kafka.linger.ms": "10000", > "kafka.batch.size": "10000000" > } > > "kafka.linger.ms" and "kafka.batch.size" are successfully set in Producer > Config (verified via startup logs) but they are not being honored. > > ======================================================================================================================== > > Implementation via Kafka Client > > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka-clients</artifactId> > <version>2.6.3</version> > </dependency> > > private static void publishViaKafkaClient(Dataset<Row> dataset, Map<String, > String> conf) { > dataset.foreachPartition( > (ForeachPartitionFunction<Row>) > rows -> { > Properties props = new Properties(); > props.putAll(conf); > KafkaProducer<String, String> producer = new > KafkaProducer<>(props); > int partitionCount = producer.partitionsFor("test-topic").size(); > System.out.println("Partition count: " + partitionCount); > > Map<Integer, Future<RecordMetadata>> lastRecordMetadata = new > HashMap<>(); > int counter = 0; > ObjectMapper mapper = new ObjectMapper(); > while (rows.hasNext()) { > Row row = rows.next(); > int partitionId = counter % partitionCount; > String value = mapper.writeValueAsString(row.get(0)); > Future<RecordMetadata> future = > producer.send(new ProducerRecord<>("test-topic", > partitionId, null, value)); > lastRecordMetadata.put(partitionId, future); > System.out.println("Sent message: " + null + " -> " + value); > counter++; > } > > lastRecordMetadata.forEach( > (key, value) -> { > try { > RecordMetadata metadata = value.get(); > System.out.println("Ack Received: " + > metadata.toString()); > } catch (Exception e) { > e.printStackTrace(); > } > }); > > System.out.println("All messages acknowledged by Kafka Server"); > producer.close(); > }); > } > > conf = { > "bootstrap.servers": "localhost:9092", > "acks": "all", > "linger.ms": "1000", > "batch.size": "100000", > "key.serializer": > "org.apache.kafka.common.serialization.ByteArraySerializer", > "value.serializer": > "org.apache.kafka.common.serialization.ByteArraySerializer" > } > > "linger.ms" and "batch.size" are successfully set in Producer Config > (verified via startup logs) and are being honored. > > ======================================================================================================================== > > > Now, the question is why "kafka.linger.ms" and "kafka.batch.size" are not > being honored by kafka connector? > > Regards, > Abhishek Singla > > > On Wed, Apr 16, 2025 at 7:19 PM daniel williams <daniel.willi...@gmail.com> > wrote: > >> If you are building a broadcast to construct a producer with a set of >> options then the producer is merely going to operate how it’s going to be >> configured - it has nothing to do with spark save that the foreachPartition >> is constructing it via the broadcast. >> >> A strategy I’ve used in the past is to >> * increase memory pool for asynchronous processing >> * make multiple broadcast producers and randomly access the producer to >> balance the asynchronous sending across more thread pools >> * implement back pressure via an adapter class to capture errors >> >> These are the same things you would want to consider while writing a high >> volume Kafka based application >> >> >> >> -dan >> >> >> On Wed, Apr 16, 2025 at 7:17 AM Abhishek Singla < >> abhisheksingla...@gmail.com> wrote: >> >>> Yes, producing via kafka-clients using foreachPartition works as >>> expected. Kafka Producer is initialised within call(Iterator<T> t) >>> method. >>> >>> The issue is with using kafka connector with Spark batch. The configs >>> are not being honored even when they are being set in ProducerConfig. This >>> means kafka records production rate cannot be controlled via kafka >>> connector in Spark batch. This can lead to lag in in-sync replicas if they >>> are not able to catch up and eventually kafka server failing writes it >>> in-sync replicas count reduced the required in-sync replicas. Is there any >>> way to solve this using kafka connector? >>> >>> Regards, >>> Abhishek Singla >>> >>