You are getting DiskChecker$DiskErrorExceptionerror when no new records are
published to Kafka for a few days. The error indicates that the Spark
application could not find a valid local directory to create temporary
files for data processing. This might    be due to any of these
- if no records are published to Kafka for a prolonged period, the S3
partition cleanup logic, enabled by default in S3AFileSystem
<https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html>
, might have removed the temporary directories used for writing batch
data. When
processing resumes, a new temporary     directory is needed, but the error
occurs due to insufficient space or permission issues (see below)
- Limited local disk space: ensure sufficient free space on the worker
nodes where Spark executors are running.
- Incorrectly configured spark.local.dir*:* verify that the
spark.local.dir property
in your Spark configuration points to a writable directory with enough
space.
- Permission issues: check directories listed in spark.local.dir are
accessible by the Spark user with read/write permissions.

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 13 Feb 2024 at 22:06, Abhishek Singla <abhisheksingla...@gmail.com>
wrote:

> Hi Team,
>
> Could someone provide some insights into this issue?
>
> Regards,
> Abhishek Singla
>
> On Wed, Jan 17, 2024 at 11:45 PM Abhishek Singla <
> abhisheksingla...@gmail.com> wrote:
>
>> Hi Team,
>>
>> Version: 3.2.2
>> Java Version: 1.8.0_211
>> Scala Version: 2.12.15
>> Cluster: Standalone
>>
>> I am using Spark Streaming to read from Kafka and write to S3. The job
>> fails with below error if there are no records published to Kafka for a few
>> days and then there are some records published. Could someone help me in
>> identifying the root cause of this job failure.
>>
>> 24/01/17 10:49:22 ERROR MicroBatchExecution: Query [id = 
>> 72ee1070-7e05-4999-8b55-2a99e216ec51, runId = 
>> 0919e548-9706-4757-be94-359848100070] terminated with error
>> org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any 
>> valid local directory for s3ablock-0001-
>>      at 
>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:462)
>>      at 
>> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:165)
>>      at 
>> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
>>      at 
>> org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:1019)
>>      at 
>> org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:816)
>>      at 
>> org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:204)
>>      at 
>> org.apache.hadoop.fs.s3a.S3ABlockOutputStream.<init>(S3ABlockOutputStream.java:182)
>>      at 
>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:1369)
>>      at org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1305)
>>      at 
>> org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:102)
>>      at 
>> org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:626)
>>      at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:701)
>>      at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:697)
>>      at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
>>      at org.apache.hadoop.fs.FileContext.create(FileContext.java:703)
>>      at 
>> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:327)
>>      at 
>> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:140)
>>      at 
>> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:143)
>>      at 
>> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:333)
>>      at 
>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$addNewBatchByStream$2(HDFSMetadataLog.scala:173)
>>      at 
>> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
>>      at scala.Option.getOrElse(Option.scala:189)
>>      at 
>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.addNewBatchByStream(HDFSMetadataLog.scala:171)
>>      at 
>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:116)
>>      at 
>> org.apache.spark.sql.execution.streaming.OffsetSeqLog.add(OffsetSeqLog.scala:53)
>>      at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$14(MicroBatchExecution.scala:442)
>>      at 
>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>      at 
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>>      at 
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>>      at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>>      at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:440)
>>      at 
>> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
>>      at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:627)
>>      at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:380)
>>      at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:210)
>>      at 
>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>      at 
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>>      at 
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>>      at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>>      at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
>>      at 
>> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>>      at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
>>      at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
>>      at 
>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>      at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>>      at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
>>      at 
>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
>>
>>
>> Dataset<Row> df =
>>     spark
>>         .readStream()
>>         .format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
>>         .options(appConfig.getKafka().getConf())
>>         .load()
>>         .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
>>
>> df.writeStream()
>>     .foreachBatch(new KafkaS3PipelineImplementation(applicationId, 
>> appConfig))
>>     .option("checkpointLocation", appConfig.getChk().getPath())
>>     .start()
>>     .awaitTermination();
>>
>>
>> Regards,
>> Abhishek Singla
>>
>>
>>
>>
>>
>>

Reply via email to