@daniel williams <daniel.willi...@gmail.com> It's batch and not streaming. I still don't understand why "kafka.linger.ms" and "kafka.batch.size" are not being honored by the kafka connector.
@rommelhol...@gmail.com <rommelhol...@gmail.com> How did you fix it? @Jungtaek Lim <kabhwan.opensou...@gmail.com> Could you help out here in case I am missing something? Regards, Abhishek Singla On Thu, Apr 17, 2025 at 6:25 AM daniel williams <daniel.willi...@gmail.com> wrote: > The contract between the two is a dataset, yes; but did you construct the > former via headstream? If not, it’s still batch. > > -dan > > > On Wed, Apr 16, 2025 at 4:54 PM Abhishek Singla < > abhisheksingla...@gmail.com> wrote: > >> >> Implementation via Kafka Connector >> >> <dependency> >> <groupId>org.apache.spark</groupId> >> <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId> >> <version>3.1.2</version> >> </dependency> >> >> private static void publishViaKafkaConnector(Dataset<Row> dataset, >> Map<String, String> conf) { >> dataset.write().format("kafka").options(conf).save(); >> } >> >> conf = { >> "kafka.bootstrap.servers": "localhost:9092", >> "kafka.acks": "all", >> "topic": "test-topic", >> "kafka.linger.ms": "10000", >> "kafka.batch.size": "10000000" >> } >> >> "kafka.linger.ms" and "kafka.batch.size" are successfully set in Producer >> Config (verified via startup logs) but they are not being honored. >> >> ======================================================================================================================== >> >> Implementation via Kafka Client >> >> <dependency> >> <groupId>org.apache.kafka</groupId> >> <artifactId>kafka-clients</artifactId> >> <version>2.6.3</version> >> </dependency> >> >> private static void publishViaKafkaClient(Dataset<Row> dataset, Map<String, >> String> conf) { >> dataset.foreachPartition( >> (ForeachPartitionFunction<Row>) >> rows -> { >> Properties props = new Properties(); >> props.putAll(conf); >> KafkaProducer<String, String> producer = new >> KafkaProducer<>(props); >> int partitionCount = producer.partitionsFor("test-topic").size(); >> System.out.println("Partition count: " + partitionCount); >> >> Map<Integer, Future<RecordMetadata>> lastRecordMetadata = new >> HashMap<>(); >> int counter = 0; >> ObjectMapper mapper = new ObjectMapper(); >> while (rows.hasNext()) { >> Row row = rows.next(); >> int partitionId = counter % partitionCount; >> String value = mapper.writeValueAsString(row.get(0)); >> Future<RecordMetadata> future = >> producer.send(new ProducerRecord<>("test-topic", >> partitionId, null, value)); >> lastRecordMetadata.put(partitionId, future); >> System.out.println("Sent message: " + null + " -> " + value); >> counter++; >> } >> >> lastRecordMetadata.forEach( >> (key, value) -> { >> try { >> RecordMetadata metadata = value.get(); >> System.out.println("Ack Received: " + >> metadata.toString()); >> } catch (Exception e) { >> e.printStackTrace(); >> } >> }); >> >> System.out.println("All messages acknowledged by Kafka Server"); >> producer.close(); >> }); >> } >> >> conf = { >> "bootstrap.servers": "localhost:9092", >> "acks": "all", >> "linger.ms": "1000", >> "batch.size": "100000", >> "key.serializer": >> "org.apache.kafka.common.serialization.ByteArraySerializer", >> "value.serializer": >> "org.apache.kafka.common.serialization.ByteArraySerializer" >> } >> >> "linger.ms" and "batch.size" are successfully set in Producer Config >> (verified via startup logs) and are being honored. >> >> ======================================================================================================================== >> >> >> Now, the question is why "kafka.linger.ms" and "kafka.batch.size" are not >> being honored by kafka connector? >> >> Regards, >> Abhishek Singla >> >> >> On Wed, Apr 16, 2025 at 7:19 PM daniel williams < >> daniel.willi...@gmail.com> wrote: >> >>> If you are building a broadcast to construct a producer with a set of >>> options then the producer is merely going to operate how it’s going to be >>> configured - it has nothing to do with spark save that the foreachPartition >>> is constructing it via the broadcast. >>> >>> A strategy I’ve used in the past is to >>> * increase memory pool for asynchronous processing >>> * make multiple broadcast producers and randomly access the producer to >>> balance the asynchronous sending across more thread pools >>> * implement back pressure via an adapter class to capture errors >>> >>> These are the same things you would want to consider while writing a >>> high volume Kafka based application >>> >>> >>> >>> -dan >>> >>> >>> On Wed, Apr 16, 2025 at 7:17 AM Abhishek Singla < >>> abhisheksingla...@gmail.com> wrote: >>> >>>> Yes, producing via kafka-clients using foreachPartition works as >>>> expected. Kafka Producer is initialised within call(Iterator<T> t) >>>> method. >>>> >>>> The issue is with using kafka connector with Spark batch. The configs >>>> are not being honored even when they are being set in ProducerConfig. This >>>> means kafka records production rate cannot be controlled via kafka >>>> connector in Spark batch. This can lead to lag in in-sync replicas if they >>>> are not able to catch up and eventually kafka server failing writes it >>>> in-sync replicas count reduced the required in-sync replicas. Is there any >>>> way to solve this using kafka connector? >>>> >>>> Regards, >>>> Abhishek Singla >>>> >>>