Thanks! Completely missed that in the docs. It's now working, however it's
not working with compression writers. Someone else noted this issue here:

https://stackoverflow.com/questions/62138635/flink-streaming-compression-not-working-using-amazon-aws-s3-connector-streaming

Looking at the code, I'm not sure I follow the nuances of why sync()
doesn't just do a call to flush in RefCountedBufferingFileStream:

public void sync() throws IOException {
throw new UnsupportedOperationException("S3RecoverableFsDataOutputStream
cannot sync state to S3. " +
"Use persist() to create a persistent recoverable intermediate point.");
}

If there are any pointers here on what should happen, happy to submit a
patch.




On Wed, Oct 7, 2020 at 1:37 AM David Anderson <da...@alpinegizmo.com> wrote:

> Dan,
>
> The first point you've raised is a known issue: When a job is stopped, the
> unfinished part files are not transitioned to the finished state. This is
> mentioned in the docs as Important Note 2 [1], and fixing this is waiting
> on FLIP-46 [2]. That section of the docs also includes some S3-specific
> warnings, but nothing pertaining to managing credentials. Perhaps [3] will
> help.
>
> Regards,
> David
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html#general
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/s3.html#configure-access-credentials
>
>
> On Wed, Oct 7, 2020 at 5:53 AM Dan Diephouse <d...@netzooid.com> wrote:
>
>> First, let me say, Flink is super cool - thanks everyone for making my
>> life easier in a lot of ways! Wish I had this 10 years ago....
>>
>> Onto the fun stuff: I am attempting to use the StreamingFileSink with S3.
>> Note that Flink is embedded in my app, not running as a standalone cluster.
>>
>> I am having a few problems, which I have illustrated in the small test
>> case below.
>>
>> 1) After my job finishes, data never gets committed to S3. Looking
>> through the code, I've noticed that data gets flushed to disk, but the
>> multi-part upload is never finished. Even though my data doesn't hit the
>> min part size, I would expect that if my job ends, my data should get
>> uploaded since the job is 100% done.
>>
>> I am also having problems when the job is running not uploading - but I
>> haven't been able to distill that down to a simple test case, so I thought
>> I'd start here.
>>
>> 2) The S3 Filesystem does not pull credentials from the Flink
>> Configuration when running in embedded mode. I have a workaround for this,
>> but it is ugly. If you comment out the line in the test case which talks
>> about this workaround, you will end up with a "Java.net.SocketException:
>> Host is down"
>>
>> Can anyone shed light on these two issues? Thanks!
>>
>> import org.apache.flink.api.common.serialization.SimpleStringEncoder;
>> import org.apache.flink.configuration.Configuration;
>> import org.apache.flink.core.fs.FileSystem;
>> import org.apache.flink.core.fs.Path;
>> import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
>> import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import
>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
>> import org.junit.jupiter.api.Test;
>>
>> public class S3Test {
>>     @Test
>>     public void whyDoesntThisWork() throws Exception {
>>         Configuration configuration = new Configuration();
>>         configuration.setString("state.backend",
>> MemoryStateBackendFactory.class.getName());
>>         configuration.setString("s3.access.key", "****");
>>         configuration.setString("s3.secret.key", "****");
>>
>>         // If I don't do this, the S3 filesystem never gets the
>> credentials
>>         FileSystem.initialize(configuration, null);
>>
>>         LocalStreamEnvironment env =
>> StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
>>
>>         StreamingFileSink<String> s3 = StreamingFileSink
>>                 .forRowFormat(new Path("s3://bucket/"), new
>> SimpleStringEncoder<String>())
>>                 .build();
>>
>>         env.fromElements("string1", "string2")
>>             .addSink(s3);
>>
>>         env.execute();
>>
>>         System.out.println("Done");
>>     }
>> }
>>
>>
>> --
>> Dan Diephouse
>> @dandiep
>>
>

-- 
Dan Diephouse
@dandiep

Reply via email to