Hi Team,
I am using Spark Streaming to read from Kafka and write to S3.
Version: 3.1.2
Scala Version: 2.12
Spark Kafka connector: spark-sql-kafka-0-10_2.12
Dataset<Row> df =
spark
.readStream()
.format("kafka")
.options(appConfig.getKafka().getConf())
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
df.writeStream()
.foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig))
.start()
.awaitTermination();
kafka.conf = {
"kafka.bootstrap.servers": "localhost:9092",
"subscribe": "test-topic",
"minOffsetsPerTrigger": 10000000,
"maxOffsetsPerTrigger": 11000000,
"maxTriggerDelay": "15m",
"groupIdPrefix": "test",
"startingOffsets": "latest",
"includeHeaders": true,
"failOnDataLoss": false
}
spark.conf = {
"spark.master": "spark://localhost:7077",
"spark.app.name": "app",
"spark.sql.streaming.kafka.useDeprecatedOffsetFetching": false,
"spark.sql.streaming.metricsEnabled": true
}
But these configs do not seem to be working as I can see Spark processing
batches of 3k-15k immediately one after another. Is there something I am
missing?
Ref:
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
Regards,
Abhishek Singla