Thanks! Completely missed that in the docs. It's now working, however it's not working with compression writers. Someone else noted this issue here:
https://stackoverflow.com/questions/62138635/flink-streaming-compression-not-working-using-amazon-aws-s3-connector-streaming Looking at the code, I'm not sure I follow the nuances of why sync() doesn't just do a call to flush in RefCountedBufferingFileStream: public void sync() throws IOException { throw new UnsupportedOperationException("S3RecoverableFsDataOutputStream cannot sync state to S3. " + "Use persist() to create a persistent recoverable intermediate point."); } If there are any pointers here on what should happen, happy to submit a patch. On Wed, Oct 7, 2020 at 1:37 AM David Anderson <da...@alpinegizmo.com> wrote: > Dan, > > The first point you've raised is a known issue: When a job is stopped, the > unfinished part files are not transitioned to the finished state. This is > mentioned in the docs as Important Note 2 [1], and fixing this is waiting > on FLIP-46 [2]. That section of the docs also includes some S3-specific > warnings, but nothing pertaining to managing credentials. Perhaps [3] will > help. > > Regards, > David > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html#general > [2] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs > [3] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/s3.html#configure-access-credentials > > > On Wed, Oct 7, 2020 at 5:53 AM Dan Diephouse <d...@netzooid.com> wrote: > >> First, let me say, Flink is super cool - thanks everyone for making my >> life easier in a lot of ways! Wish I had this 10 years ago.... >> >> Onto the fun stuff: I am attempting to use the StreamingFileSink with S3. >> Note that Flink is embedded in my app, not running as a standalone cluster. >> >> I am having a few problems, which I have illustrated in the small test >> case below. >> >> 1) After my job finishes, data never gets committed to S3. Looking >> through the code, I've noticed that data gets flushed to disk, but the >> multi-part upload is never finished. Even though my data doesn't hit the >> min part size, I would expect that if my job ends, my data should get >> uploaded since the job is 100% done. >> >> I am also having problems when the job is running not uploading - but I >> haven't been able to distill that down to a simple test case, so I thought >> I'd start here. >> >> 2) The S3 Filesystem does not pull credentials from the Flink >> Configuration when running in embedded mode. I have a workaround for this, >> but it is ugly. If you comment out the line in the test case which talks >> about this workaround, you will end up with a "Java.net.SocketException: >> Host is down" >> >> Can anyone shed light on these two issues? Thanks! >> >> import org.apache.flink.api.common.serialization.SimpleStringEncoder; >> import org.apache.flink.configuration.Configuration; >> import org.apache.flink.core.fs.FileSystem; >> import org.apache.flink.core.fs.Path; >> import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory; >> import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; >> import >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> import >> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; >> import org.junit.jupiter.api.Test; >> >> public class S3Test { >> @Test >> public void whyDoesntThisWork() throws Exception { >> Configuration configuration = new Configuration(); >> configuration.setString("state.backend", >> MemoryStateBackendFactory.class.getName()); >> configuration.setString("s3.access.key", "****"); >> configuration.setString("s3.secret.key", "****"); >> >> // If I don't do this, the S3 filesystem never gets the >> credentials >> FileSystem.initialize(configuration, null); >> >> LocalStreamEnvironment env = >> StreamExecutionEnvironment.createLocalEnvironment(1, configuration); >> >> StreamingFileSink<String> s3 = StreamingFileSink >> .forRowFormat(new Path("s3://bucket/"), new >> SimpleStringEncoder<String>()) >> .build(); >> >> env.fromElements("string1", "string2") >> .addSink(s3); >> >> env.execute(); >> >> System.out.println("Done"); >> } >> } >> >> >> -- >> Dan Diephouse >> @dandiep >> > -- Dan Diephouse @dandiep