I am trying to read .csv.gz file in S3 and write it to S3 in Parquet format through a Spark job. Currently I am using AWS EMR service for this. Spark Job is execute as a step in EMR cluster. For some .csv.gz files I have encounter below issue. I have used both spark 3.4 and 3.5 versions and still getting same error.
> > Exception in thread "main" java.lang.Exception: > org.apache.spark.SparkException: Job aborted due to stage failure: Task 63 > in stage 4.0 failed 4 times, most recent failure: Lost task 63.3 in stage > 4.0 (TID 71) (ip-100-68-72-50.880639474967.ase1.aws.dev.r53 executor 4): > org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while > writing rows to s3://<S3 Bucket>/TEMP/. > > at > org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:789) > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:421) > > at > org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100) > > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888) > > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) > > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) > > at > org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) > > at org.apache.spark.scheduler.Task.run(Task.scala:141) > > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) > > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) > > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) > > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) > > at java.base/java.lang.Thread.run(Thread.java:840) > > Caused by: java.io.EOFException: Unexpected end of input stream > > at > org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:165) > > at > org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105) > > at > nl.basjes.hadoop.io.compress.SplittableGzipCodec$SplittableGzipInputStream.read(SplittableGzipCodec.java:468) > > at java.base/java.io.InputStream.read(InputStream.java:218) > > at > org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.fillBuffer(CompressedSplitLineReader.java:132) > > at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227) > > at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185) > > at > org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.readLine(CompressedSplitLineReader.java:163) > > at > org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:200) > > at > org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) > > at > org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:67) > > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) > > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513) > > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) > > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) > > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:239) > > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) > > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) > > at > org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91) > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:404) > > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1575) > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:411) > > ... 15 more Driver stacktrace: > at > com.lseg.lsa.pipeline.core.MIngestionUtilities$.validateInitialDataframe(MIngestionUtilities.scala:205) > at > com.lseg.lsa.pipeline.common.DataIngestion.doInitialDataframeValidation(DataIngestion.scala:22) > at > com.lseg.lsa.pipeline.common.DataIngestion.doInitialDataframeValidation$(DataIngestion.scala:21) > at > com.lseg.lsa.pipeline.refinitiv.mp.stat.doInitialDataframeValidation(MPStat.scala:11) > at > com.lseg.lsa.pipeline.common.DataIngestion.dataIngestionFunction(DataIngestion.scala:11) > at > com.lseg.lsa.pipeline.common.DataIngestion.dataIngestionFunction$(DataIngestion.scala:9) > at com.lseg.lsa.pipeline.mp.stat.dataIngestionFunction(MPStat.scala:11) > at Main$.main(Main.scala:11) > at Main.main(Main.scala) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:569) > at > org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) > at org.apache.spark.deploy.SparkSubmit.org > $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1066) > at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192) > at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215) > at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91) > at > org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1158) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1167) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) I have used below spark configurations as a solution for above issue but it leads to another issue in Spark Job which mentioned below. - spark.hadoop.fs.s3a.connection.maximum=1000 - spark.hadoop.fs.s3a.connection.timeout=60000 - spark.hadoop.fs.s3a.attempts.maximum=20 - spark.hadoop.fs.s3a.multipart.size=104857600 - spark.task.maxFailures=8 - spark.speculation=true Exception in thread "main" java.lang.Exception: > org.apache.spark.SparkException: Job aborted due to stage failure: > Authorized committer (attemptNumber=0, stage=4, partition=25) failed; but > task commit success, data duplication may happen. > reason=ExceptionFailure(org.apache.spark.SparkException,[TASK_WRITE_FAILED] > Task failed while writing rows to s3://<S3 > Bucket>/TEMP/..,[Ljava.lang.StackTraceElement;@4f391a0f,org.apache.spark.SparkException: > [TASK_WRITE_FAILED] Task failed while writing rows to s3://<S3 > Bucket>/TEMP/.. > at > org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:789) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:421) > at > org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100) > at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888) > at > org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) > at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) > at org.apache.spark.scheduler.Task.run(Task.scala:141) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) > at java.base/java.lang.Thread.run(Thread.java:840) > Caused by: > com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: > One or more of the specified parts could not be found. The part may not > have been uploaded, or the specified entity tag may not match the part's > entity tag. (Service: Amazon S3; Status Code: 400; Error Code: InvalidPart; > Request ID: G6ZF85DQRRCK239S; S3 Extended Request ID: > McDcoY3cKrTeQ/hD5hWUK4kO9FwgK7LHyMv2aFyQfVXgl02tPqpGDpPLULgVdb51gu2D57egOQ6swOc4XSCzTQ==; > Proxy: null), S3 Extended Request ID: > McDcoY3cKrTeQ/hD5hWUK4kO9FwgK7LHyMv2aFyQfVXgl02tPqpGDpPLULgVdb51gu2D57egOQ6swOc4XSCzTQ== > at > com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1879) > at > com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1418) > at > com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1387) > at > com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1157) > at > com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:814) > at > com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781) > at > com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755) > at > com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715) > at > com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697) > at > com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561) > at > com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541) > at > com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5516) > at > com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5463) > at > com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.completeMultipartUpload(AmazonS3Client.java:3667) > at > com.amazon.ws.emr.hadoop.fs.s3.lite.call.CompleteMultipartUploadCall.perform(CompleteMultipartUploadCall.java:26) > at > com.amazon.ws.emr.hadoop.fs.s3.lite.call.CompleteMultipartUploadCall.perform(CompleteMultipartUploadCall.java:12) > at > com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor$CallPerformer.call(GlobalS3Executor.java:114) > at > com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:141) > at > com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:196) > at > com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:191) > at > com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.completeMultipartUpload(AmazonS3LiteClient.java:170) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:569) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157) > at > org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359) > at jdk.proxy2/jdk.proxy2.$Proxy47.completeMultipartUpload(Unknown Source) > at > com.amazon.ws.emr.hadoop.fs.s3.upload.dispatch.DefaultMultipartUploadDispatcher.completeUpload(DefaultMultipartUploadDispatcher.java:57) > at > com.amazon.ws.emr.hadoop.fs.s3.upload.dispatch.DefaultMultipartUploadDispatcher.complete(DefaultMultipartUploadDispatcher.java:40) > at > com.amazon.ws.emr.hadoop.fs.staging.metadata.inmemory.InMemoryStagingDirectory.completeUpload(InMemoryStagingDirectory.java:227) > at > com.amazon.ws.emr.hadoop.fs.staging.metadata.inmemory.InMemoryStagingDirectory.publish(InMemoryStagingDirectory.java:109) > at > com.amazon.ws.emr.hadoop.fs.staging.metadata.inmemory.SynchronizedStagingDirectory.publish(SynchronizedStagingDirectory.java:52) > at > com.amazon.ws.emr.hadoop.fs.staging.metadata.inmemory.InMemoryStagingMetadataStore.publishStagingDirectory(InMemoryStagingMetadataStore.java:96) > at > com.amazon.ws.emr.hadoop.fs.staging.DefaultStagingMechanism.publishStagingDirectory(DefaultStagingMechanism.java:76) > at > org.apache.hadoop.mapreduce.lib.output.FileSystemOptimizedCommitter.publishStagingDirectory(FileSystemOptimizedCommitter.java:149) > at > org.apache.hadoop.mapreduce.lib.output.FileSystemOptimizedCommitter.commitTask(FileSystemOptimizedCommitter.java:138) > at > com.amazon.emr.committer.FilterParquetOutputCommitter.commitTask(FilterParquetOutputCommitter.java:119) > at > com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter.commitTask(EmrOptimizedSparkSqlParquetOutputCommitter.java:9) > at > org.apache.spark.mapred.SparkHadoopMapRedUtil$.$anonfun$commitTask$1(SparkHadoopMapRedUtil.scala:51) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:641) > at > org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:51) > at > org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:78) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:233) > at > org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol.commitTask(SQLEmrOptimizedCommitProtocol.scala:129) > at > org.apache.spark.sql.execution.datasources.FileFormatDataWriter.$anonfun$commit$1(FileFormatDataWriter.scala:107) > at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:641) > at > org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:107) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:405) > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1575) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:411) > ... 15 more > ,Some(org.apache.spark.ThrowableSerializationWrapper@6546722d),Vector(AccumulableInfo(200,None,Some(445114),None,false,true,None), > AccumulableInfo(202,None,Some(0),None,false,true,None), > AccumulableInfo(203,None,Some(1889),None,false,true,None), > AccumulableInfo(229,None,Some(3150792478),None,false,true,None), > AccumulableInfo(230,None,Some(5475045),None,false,true,None)),Vector(LongAccumulator(id: > 200, name: Some(internal.metrics.executorRunTime), value: 445114), > LongAccumulator(id: 202, name: Some(internal.metrics.resultSize), value: > 0), LongAccumulator(id: 203, name: Some(internal.metrics.jvmGCTime), value: > 1889), LongAccumulator(id: 229, name: > Some(internal.metrics.input.bytesRead), value: 3150792478), > LongAccumulator(id: 230, name: Some(internal.metrics.input.recordsRead), > value: 5475045)),WrappedArray(1724393088, 157016336, 0, 0, 1000665, 0, > 1000665, 0, 30218252, 0, 0, 0, 0, 0, 0, 0, 500, 1934, 0, 0, 1934)) > at > com.lseg.lsa.pipeline.core.MIngestionUtilities$.validateInitialDataframe(MIngestionUtilities.scala:205) > at > com.lseg.lsa.pipeline.common.DataIngestion.doInitialDataframeValidation(DataIngestion.scala:22) > at > com.lseg.lsa.pipeline.common.DataIngestion.doInitialDataframeValidation$(DataIngestion.scala:21) > at > com.lseg.lsa.pipeline.refinitiv.mp.stat.doInitialDataframeValidation(MPStat.scala:11) > at > com.lseg.lsa.pipeline.common.DataIngestion.dataIngestionFunction(DataIngestion.scala:11) > at > com.lseg.lsa.pipeline.common.DataIngestion.dataIngestionFunction$(DataIngestion.scala:9) > at > com.lseg.lsa.pipeline.refinitiv.mp.stat.dataIngestionFunction(MPStat.scala:11) > at Main$.main(Main.scala:11) > at Main.main(Main.scala) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:569) > at > org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) > at org.apache.spark.deploy.SparkSubmit.org > $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1066) > at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192) > at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215) > at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91) > at > org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1158) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1167) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Seeking for support to help above issues. Thank You, Best Regards, Nipuna Shantha