[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356524#comment-16356524 ]
chris snow commented on FLINK-8543: ----------------------------------- One thing to note - I've used the standard Flink 1.4.0 download for hadoop 2.7 but I am running on a Hortonworks based hadoop environment. I'm not sure what impact this may have, but I thought it was worth mentioning in case this may have an impact. > Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen > -------------------------------------------------------------------------- > > Key: FLINK-8543 > URL: https://issues.apache.org/jira/browse/FLINK-8543 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors > Affects Versions: 1.4.0 > Environment: IBM Analytics Engine - > [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction] > The cluster is based on Hortonworks Data Platform 2.6.2. The following > components are made available. > Apache Spark 2.1.1 Hadoop 2.7.3 > Apache Livy 0.3.0 > Knox 0.12.0 > Ambari 2.5.2 > Anaconda with Python 2.7.13 and 3.5.2 > Jupyter Enterprise Gateway 0.5.0 > HBase 1.1.2 * > Hive 1.2.1 * > Oozie 4.2.0 * > Flume 1.5.2 * > Tez 0.7.0 * > Pig 0.16.0 * > Sqoop 1.4.6 * > Slider 0.92.0 * > Reporter: chris snow > Priority: Blocker > Fix For: 1.5.0 > > Attachments: Screen Shot 2018-01-30 at 18.34.51.png > > > I'm hitting an issue with my BucketingSink from a streaming job. > > {code:java} > return new BucketingSink<Tuple2<String, Object>>(path) > .setWriter(writer) > .setBucketer(new DateTimeBucketer<Tuple2<String, > Object>>(formatString)); > {code} > > I can see that a few files have run into issues with uploading to S3: > !Screen Shot 2018-01-30 at 18.34.51.png! > The Flink console output is showing an exception being thrown by > S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster > and added some additional logging to the checkOpen() method to log the 'key' > just before the exception is thrown: > > {code:java} > /* > * Decompiled with CFR. > */ > package org.apache.hadoop.fs.s3a; > import com.amazonaws.AmazonClientException; > import com.amazonaws.event.ProgressListener; > import com.amazonaws.services.s3.model.ObjectMetadata; > import com.amazonaws.services.s3.model.PutObjectRequest; > import com.amazonaws.services.s3.transfer.Upload; > import com.amazonaws.services.s3.transfer.model.UploadResult; > import java.io.BufferedOutputStream; > import java.io.File; > import java.io.FileOutputStream; > import java.io.IOException; > import java.io.InterruptedIOException; > import java.io.OutputStream; > import java.util.concurrent.atomic.AtomicBoolean; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.s3a.ProgressableProgressListener; > import org.apache.hadoop.fs.s3a.S3AFileSystem; > import org.apache.hadoop.fs.s3a.S3AUtils; > import org.apache.hadoop.util.Progressable; > import org.slf4j.Logger; > @InterfaceAudience.Private > @InterfaceStability.Evolving > public class S3AOutputStream > extends OutputStream { > private final OutputStream backupStream; > private final File backupFile; > private final AtomicBoolean closed = new AtomicBoolean(false); > private final String key; > private final Progressable progress; > private final S3AFileSystem fs; > public static final Logger LOG = S3AFileSystem.LOG; > public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, > Progressable progress) throws IOException { > this.key = key; > this.progress = progress; > this.fs = fs; > this.backupFile = fs.createTmpFileForWrite("output-", -1, conf); > LOG.debug("OutputStream for key '{}' writing to tempfile: {}", > (Object)key, (Object)this.backupFile); > this.backupStream = new BufferedOutputStream(new > FileOutputStream(this.backupFile)); > } > void checkOpen() throws IOException { > if (!this.closed.get()) return; > // vvvvvv-- Additional logging --vvvvvvv > LOG.error("OutputStream for key '{}' closed.", (Object)this.key); > throw new IOException("Output Stream closed"); > } > @Override > public void flush() throws IOException { > this.checkOpen(); > this.backupStream.flush(); > } > @Override > public void close() throws IOException { > if (this.closed.getAndSet(true)) { > return; > } > this.backupStream.close(); > LOG.debug("OutputStream for key '{}' closed. Now beginning upload", > (Object)this.key); > try { > ObjectMetadata om = > this.fs.newObjectMetadata(this.backupFile.length()); > Upload upload = > this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile)); > ProgressableProgressListener listener = new > ProgressableProgressListener(this.fs, this.key, upload, this.progress); > upload.addProgressListener((ProgressListener)listener); > upload.waitForUploadResult(); > listener.uploadCompleted(); > this.fs.finishedWrite(this.key); > } > catch (InterruptedException e) { > throw (InterruptedIOException)new > InterruptedIOException(e.toString()).initCause(e); > } > catch (AmazonClientException e) { > throw S3AUtils.translateException("saving output", this.key, e); > } > finally { > if (!this.backupFile.delete()) { > LOG.warn("Could not delete temporary s3a file: {}", > (Object)this.backupFile); > } > super.close(); > } > LOG.debug("OutputStream for key '{}' upload complete", > (Object)this.key); > } > @Override > public void write(int b) throws IOException { > this.checkOpen(); > this.backupStream.write(b); > } > @Override > public void write(byte[] b, int off, int len) throws IOException { > this.checkOpen(); > this.backupStream.write(b, off, len); > } > static { > } > } > {code} > > You can see from this addition log output that the S3AOutputStream#close() > method **appears** to be called before the S3AOutputStream#flush() method: > > {code:java} > 2018-02-01 12:42:20,698 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress > - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: > 128497 bytes > 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress > - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 > bytes > 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress > - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 > bytes > 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress > - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 > bytes > 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress > - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 > bytes > 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress > - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 > bytes > 2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem > - Finished write to > landingzone/2018-02-01--1240/_part-0-0.in-progress > 2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem > - object_delete_requests += 1 -> 3 > vvvvv- close() is called here? -vvvvv > 2018-02-01 12:42:21,212 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem > > - OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress' > upload complete > vvvvv- flush() is called here? -vvvvv > 2018-02-01 12:42:21,212 ERROR org.apache.hadoop.fs.s3a.S3AFileSystem > > - OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress' > closed. > 2018-02-01 12:42:21,212 INFO org.apache.flink.runtime.taskmanager.Task > > - Attempting to fail task externally Source: Custom Source -> Map -> Sink: > Unnamed (1/2) (510c8316d3a249e5ea5b8d8e693f7beb). > 2018-02-01 12:42:21,214 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source -> Map -> Sink: Unnamed (1/2) > (510c8316d3a249e5ea5b8d8e693f7beb) switched from RUNNING to FAILED. > TimerException{java.io.IOException: Output Stream closed} > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Output Stream closed > at > org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen(S3AOutputStream.java:83) > at > org.apache.hadoop.fs.s3a.S3AOutputStream.flush(S3AOutputStream.java:89) > at java.io.FilterOutputStream.flush(FilterOutputStream.java:140) > at java.io.DataOutputStream.flush(DataOutputStream.java:123) > at java.io.FilterOutputStream.flush(FilterOutputStream.java:140) > at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141) > at > org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerFlush(BufferedBinaryEncoder.java:220) > at > org.apache.avro.io.BufferedBinaryEncoder.flush(BufferedBinaryEncoder.java:85) > at org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:368) > at org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:375) > at > org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter$AvroKeyValueWriter.close(AvroKeyValueSinkWriter.java:251) > at > org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.close(AvroKeyValueSinkWriter.java:163) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:551) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:493) > at > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:476) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249) > ... 7 more > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)