I am trying to read .csv.gz file in S3 and write it to S3 in Parquet format
through a Spark job. Currently I am using AWS EMR service for this. Spark
Job is execute as a step in EMR cluster. For some .csv.gz files I have
encounter below issue. I have used both spark 3.4 and 3.5 versions and
still getting same error.

> > Exception in thread "main" java.lang.Exception:
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 63
> in stage 4.0 failed 4 times, most recent failure: Lost task 63.3 in stage
> 4.0 (TID 71) (ip-100-68-72-50.880639474967.ase1.aws.dev.r53 executor 4):
> org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while
> writing rows to s3://<S3 Bucket>/TEMP/.
> > at
> org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:789)
> > at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:421)
> > at
> org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
> > at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
> > at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
> > at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
> > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
> > at
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
> > at org.apache.spark.scheduler.Task.run(Task.scala:141)
> > at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
> > at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
> > at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> > at java.base/java.lang.Thread.run(Thread.java:840)
> > Caused by: java.io.EOFException: Unexpected end of input stream
> > at
> org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:165)
> > at
> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
> > at
> nl.basjes.hadoop.io.compress.SplittableGzipCodec$SplittableGzipInputStream.read(SplittableGzipCodec.java:468)
> > at java.base/java.io.InputStream.read(InputStream.java:218)
> > at
> org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.fillBuffer(CompressedSplitLineReader.java:132)
> > at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
> > at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
> > at
> org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.readLine(CompressedSplitLineReader.java:163)
> > at
> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:200)
> > at
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
> > at
> org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:67)
> > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513)
> > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
> > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> > at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:239)
> > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
> > at
> org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
> > at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:404)
> > at
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1575)
> > at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:411)
> > ... 15 more

Driver stacktrace:
> at
> com.lseg.lsa.pipeline.core.MIngestionUtilities$.validateInitialDataframe(MIngestionUtilities.scala:205)
> at
> com.lseg.lsa.pipeline.common.DataIngestion.doInitialDataframeValidation(DataIngestion.scala:22)
> at
> com.lseg.lsa.pipeline.common.DataIngestion.doInitialDataframeValidation$(DataIngestion.scala:21)
> at
> com.lseg.lsa.pipeline.refinitiv.mp.stat.doInitialDataframeValidation(MPStat.scala:11)
> at
> com.lseg.lsa.pipeline.common.DataIngestion.dataIngestionFunction(DataIngestion.scala:11)
> at
> com.lseg.lsa.pipeline.common.DataIngestion.dataIngestionFunction$(DataIngestion.scala:9)
> at com.lseg.lsa.pipeline.mp.stat.dataIngestionFunction(MPStat.scala:11)
> at Main$.main(Main.scala:11)
> at Main.main(Main.scala)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:569)
> at
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at org.apache.spark.deploy.SparkSubmit.org
> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1066)
> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
> at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1158)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1167)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


I have used below spark configurations as a solution for above issue but it
leads to another issue in Spark Job which mentioned below.

   - spark.hadoop.fs.s3a.connection.maximum=1000
   - spark.hadoop.fs.s3a.connection.timeout=60000
   - spark.hadoop.fs.s3a.attempts.maximum=20
   - spark.hadoop.fs.s3a.multipart.size=104857600
   - spark.task.maxFailures=8
   - spark.speculation=true


Exception in thread "main" java.lang.Exception:
> org.apache.spark.SparkException: Job aborted due to stage failure:
> Authorized committer (attemptNumber=0, stage=4, partition=25) failed; but
> task commit success, data duplication may happen.
> reason=ExceptionFailure(org.apache.spark.SparkException,[TASK_WRITE_FAILED]
> Task failed while writing rows to s3://<S3
> Bucket>/TEMP/..,[Ljava.lang.StackTraceElement;@4f391a0f,org.apache.spark.SparkException:
> [TASK_WRITE_FAILED] Task failed while writing rows to s3://<S3
> Bucket>/TEMP/..
> at
> org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:789)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:421)
> at
> org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
> at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
> at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
> at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
> at org.apache.spark.scheduler.Task.run(Task.scala:141)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> at java.base/java.lang.Thread.run(Thread.java:840)
> Caused by:
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
> One or more of the specified parts could not be found.  The part may not
> have been uploaded, or the specified entity tag may not match the part's
> entity tag. (Service: Amazon S3; Status Code: 400; Error Code: InvalidPart;
> Request ID: G6ZF85DQRRCK239S; S3 Extended Request ID:
> McDcoY3cKrTeQ/hD5hWUK4kO9FwgK7LHyMv2aFyQfVXgl02tPqpGDpPLULgVdb51gu2D57egOQ6swOc4XSCzTQ==;
> Proxy: null), S3 Extended Request ID:
> McDcoY3cKrTeQ/hD5hWUK4kO9FwgK7LHyMv2aFyQfVXgl02tPqpGDpPLULgVdb51gu2D57egOQ6swOc4XSCzTQ==
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1879)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1418)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1387)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1157)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:814)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5516)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5463)
> at
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.completeMultipartUpload(AmazonS3Client.java:3667)
> at
> com.amazon.ws.emr.hadoop.fs.s3.lite.call.CompleteMultipartUploadCall.perform(CompleteMultipartUploadCall.java:26)
> at
> com.amazon.ws.emr.hadoop.fs.s3.lite.call.CompleteMultipartUploadCall.perform(CompleteMultipartUploadCall.java:12)
> at
> com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor$CallPerformer.call(GlobalS3Executor.java:114)
> at
> com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:141)
> at
> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:196)
> at
> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:191)
> at
> com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.completeMultipartUpload(AmazonS3LiteClient.java:170)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:569)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
> at jdk.proxy2/jdk.proxy2.$Proxy47.completeMultipartUpload(Unknown Source)
> at
> com.amazon.ws.emr.hadoop.fs.s3.upload.dispatch.DefaultMultipartUploadDispatcher.completeUpload(DefaultMultipartUploadDispatcher.java:57)
> at
> com.amazon.ws.emr.hadoop.fs.s3.upload.dispatch.DefaultMultipartUploadDispatcher.complete(DefaultMultipartUploadDispatcher.java:40)
> at
> com.amazon.ws.emr.hadoop.fs.staging.metadata.inmemory.InMemoryStagingDirectory.completeUpload(InMemoryStagingDirectory.java:227)
> at
> com.amazon.ws.emr.hadoop.fs.staging.metadata.inmemory.InMemoryStagingDirectory.publish(InMemoryStagingDirectory.java:109)
> at
> com.amazon.ws.emr.hadoop.fs.staging.metadata.inmemory.SynchronizedStagingDirectory.publish(SynchronizedStagingDirectory.java:52)
> at
> com.amazon.ws.emr.hadoop.fs.staging.metadata.inmemory.InMemoryStagingMetadataStore.publishStagingDirectory(InMemoryStagingMetadataStore.java:96)
> at
> com.amazon.ws.emr.hadoop.fs.staging.DefaultStagingMechanism.publishStagingDirectory(DefaultStagingMechanism.java:76)
> at
> org.apache.hadoop.mapreduce.lib.output.FileSystemOptimizedCommitter.publishStagingDirectory(FileSystemOptimizedCommitter.java:149)
> at
> org.apache.hadoop.mapreduce.lib.output.FileSystemOptimizedCommitter.commitTask(FileSystemOptimizedCommitter.java:138)
> at
> com.amazon.emr.committer.FilterParquetOutputCommitter.commitTask(FilterParquetOutputCommitter.java:119)
> at
> com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter.commitTask(EmrOptimizedSparkSqlParquetOutputCommitter.java:9)
> at
> org.apache.spark.mapred.SparkHadoopMapRedUtil$.$anonfun$commitTask$1(SparkHadoopMapRedUtil.scala:51)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:641)
> at
> org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:51)
> at
> org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:78)
> at
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:233)
> at
> org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol.commitTask(SQLEmrOptimizedCommitProtocol.scala:129)
> at
> org.apache.spark.sql.execution.datasources.FileFormatDataWriter.$anonfun$commit$1(FileFormatDataWriter.scala:107)
> at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:641)
> at
> org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:107)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:405)
> at
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1575)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:411)
> ... 15 more
> ,Some(org.apache.spark.ThrowableSerializationWrapper@6546722d),Vector(AccumulableInfo(200,None,Some(445114),None,false,true,None),
> AccumulableInfo(202,None,Some(0),None,false,true,None),
> AccumulableInfo(203,None,Some(1889),None,false,true,None),
> AccumulableInfo(229,None,Some(3150792478),None,false,true,None),
> AccumulableInfo(230,None,Some(5475045),None,false,true,None)),Vector(LongAccumulator(id:
> 200, name: Some(internal.metrics.executorRunTime), value: 445114),
> LongAccumulator(id: 202, name: Some(internal.metrics.resultSize), value:
> 0), LongAccumulator(id: 203, name: Some(internal.metrics.jvmGCTime), value:
> 1889), LongAccumulator(id: 229, name:
> Some(internal.metrics.input.bytesRead), value: 3150792478),
> LongAccumulator(id: 230, name: Some(internal.metrics.input.recordsRead),
> value: 5475045)),WrappedArray(1724393088, 157016336, 0, 0, 1000665, 0,
> 1000665, 0, 30218252, 0, 0, 0, 0, 0, 0, 0, 500, 1934, 0, 0, 1934))
> at
> com.lseg.lsa.pipeline.core.MIngestionUtilities$.validateInitialDataframe(MIngestionUtilities.scala:205)
> at
> com.lseg.lsa.pipeline.common.DataIngestion.doInitialDataframeValidation(DataIngestion.scala:22)
> at
> com.lseg.lsa.pipeline.common.DataIngestion.doInitialDataframeValidation$(DataIngestion.scala:21)
> at
> com.lseg.lsa.pipeline.refinitiv.mp.stat.doInitialDataframeValidation(MPStat.scala:11)
> at
> com.lseg.lsa.pipeline.common.DataIngestion.dataIngestionFunction(DataIngestion.scala:11)
> at
> com.lseg.lsa.pipeline.common.DataIngestion.dataIngestionFunction$(DataIngestion.scala:9)
> at
> com.lseg.lsa.pipeline.refinitiv.mp.stat.dataIngestionFunction(MPStat.scala:11)
> at Main$.main(Main.scala:11)
> at Main.main(Main.scala)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:569)
> at
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at org.apache.spark.deploy.SparkSubmit.org
> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1066)
> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
> at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1158)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1167)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Seeking for support to help above issues.

Thank You,
Best Regards,
Nipuna Shantha

Reply via email to