0
<https://stackoverflow.com/posts/79138442/timeline>

I have a spark dataframe which I am writing to Kafka topic using the below
script.

val ProducerConfig: Map[String, String] = Map(
    s"kafka.${ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG}" ->
Integer.MAX_VALUE.toString,
    s"kafka.${ProducerConfig.RETRIES_CONFIG}" -> Integer.MAX_VALUE.toString,
    s"kafka.${ProducerConfig.MAX_BLOCK_MS_CONFIG}" -> Long.MaxValue.toString,
    s"kafka.${ProducerConfig.ACKS_CONFIG}" -> "all",
    s"kafka.${ProducerConfig.MAX_REQUEST_SIZE_CONFIG}" ->
(100*1024*1024).toString // 100 mb
  )
df.write.format("kafka")
    .option("kafka.bootstrap.servers", broker)
    .options(ProducerConfig)
    .option("topic", topic)
    .save()

My DataFrame is static and has 500k records. The above script is dumping
all the records to the topic at once. I want a rate limiter where in 10
seconds only 20k messages should be written to the topic. So each batch
ingestion should wait at least 10 seconds after writing messages to the
topic.

One approach is to divide the DataFrame in chunks and dump it in batches
with a sleep time of 10 seconds. I need an alternative approach which
should be config driven and i do not have to slice my data. Is there a way
we can ingest the data in batches with some waiting time?

Reply via email to