You are getting DiskChecker$DiskErrorExceptionerror when no new records are published to Kafka for a few days. The error indicates that the Spark application could not find a valid local directory to create temporary files for data processing. This might be due to any of these - if no records are published to Kafka for a prolonged period, the S3 partition cleanup logic, enabled by default in S3AFileSystem <https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html> , might have removed the temporary directories used for writing batch data. When processing resumes, a new temporary directory is needed, but the error occurs due to insufficient space or permission issues (see below) - Limited local disk space: ensure sufficient free space on the worker nodes where Spark executors are running. - Incorrectly configured spark.local.dir*:* verify that the spark.local.dir property in your Spark configuration points to a writable directory with enough space. - Permission issues: check directories listed in spark.local.dir are accessible by the Spark user with read/write permissions.
HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Tue, 13 Feb 2024 at 22:06, Abhishek Singla <abhisheksingla...@gmail.com> wrote: > Hi Team, > > Could someone provide some insights into this issue? > > Regards, > Abhishek Singla > > On Wed, Jan 17, 2024 at 11:45 PM Abhishek Singla < > abhisheksingla...@gmail.com> wrote: > >> Hi Team, >> >> Version: 3.2.2 >> Java Version: 1.8.0_211 >> Scala Version: 2.12.15 >> Cluster: Standalone >> >> I am using Spark Streaming to read from Kafka and write to S3. The job >> fails with below error if there are no records published to Kafka for a few >> days and then there are some records published. Could someone help me in >> identifying the root cause of this job failure. >> >> 24/01/17 10:49:22 ERROR MicroBatchExecution: Query [id = >> 72ee1070-7e05-4999-8b55-2a99e216ec51, runId = >> 0919e548-9706-4757-be94-359848100070] terminated with error >> org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any >> valid local directory for s3ablock-0001- >> at >> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:462) >> at >> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:165) >> at >> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146) >> at >> org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:1019) >> at >> org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:816) >> at >> org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:204) >> at >> org.apache.hadoop.fs.s3a.S3ABlockOutputStream.<init>(S3ABlockOutputStream.java:182) >> at >> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:1369) >> at org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1305) >> at >> org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:102) >> at >> org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:626) >> at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:701) >> at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:697) >> at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) >> at org.apache.hadoop.fs.FileContext.create(FileContext.java:703) >> at >> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:327) >> at >> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:140) >> at >> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:143) >> at >> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:333) >> at >> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$addNewBatchByStream$2(HDFSMetadataLog.scala:173) >> at >> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) >> at scala.Option.getOrElse(Option.scala:189) >> at >> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.addNewBatchByStream(HDFSMetadataLog.scala:171) >> at >> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:116) >> at >> org.apache.spark.sql.execution.streaming.OffsetSeqLog.add(OffsetSeqLog.scala:53) >> at >> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$14(MicroBatchExecution.scala:442) >> at >> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) >> at >> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375) >> at >> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373) >> at >> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) >> at >> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:440) >> at >> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) >> at >> org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:627) >> at >> org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:380) >> at >> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:210) >> at >> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) >> at >> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375) >> at >> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373) >> at >> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) >> at >> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193) >> at >> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57) >> at >> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187) >> at >> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303) >> at >> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) >> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) >> at >> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286) >> at >> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209) >> >> >> Dataset<Row> df = >> spark >> .readStream() >> .format("org.apache.spark.sql.kafka010.KafkaSourceProvider") >> .options(appConfig.getKafka().getConf()) >> .load() >> .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); >> >> df.writeStream() >> .foreachBatch(new KafkaS3PipelineImplementation(applicationId, >> appConfig)) >> .option("checkpointLocation", appConfig.getChk().getPath()) >> .start() >> .awaitTermination(); >> >> >> Regards, >> Abhishek Singla >> >> >> >> >> >>