FYI - I discovered that if I specify the Hadoop compression codec it works
fine. E.g.:

CompressWriters.forExtractor(new
DefaultExtractor()).withHadoopCompression("GzipCodec")

Haven't dug into exactly why yet.

On Wed, Oct 7, 2020 at 12:14 PM David Anderson <da...@alpinegizmo.com>
wrote:

> Looping in @Kostas Kloudas <kklou...@apache.org> who should be able to
> clarify things.
>
> David
>
> On Wed, Oct 7, 2020 at 7:12 PM Dan Diephouse <d...@netzooid.com> wrote:
>
>> Thanks! Completely missed that in the docs. It's now working, however
>> it's not working with compression writers. Someone else noted this issue
>> here:
>>
>>
>> https://stackoverflow.com/questions/62138635/flink-streaming-compression-not-working-using-amazon-aws-s3-connector-streaming
>>
>> Looking at the code, I'm not sure I follow the nuances of why sync()
>> doesn't just do a call to flush in RefCountedBufferingFileStream:
>>
>> public void sync() throws IOException {
>> throw new UnsupportedOperationException("S3RecoverableFsDataOutputStream
>> cannot sync state to S3. " +
>> "Use persist() to create a persistent recoverable intermediate point.");
>> }
>>
>> If there are any pointers here on what should happen, happy to submit a
>> patch.
>>
>>
>>
>>
>> On Wed, Oct 7, 2020 at 1:37 AM David Anderson <da...@alpinegizmo.com>
>> wrote:
>>
>>> Dan,
>>>
>>> The first point you've raised is a known issue: When a job is stopped,
>>> the unfinished part files are not transitioned to the finished state. This
>>> is mentioned in the docs as Important Note 2 [1], and fixing this is
>>> waiting on FLIP-46 [2]. That section of the docs also includes some
>>> S3-specific warnings, but nothing pertaining to managing credentials.
>>> Perhaps [3] will help.
>>>
>>> Regards,
>>> David
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html#general
>>> [2]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/s3.html#configure-access-credentials
>>>
>>>
>>> On Wed, Oct 7, 2020 at 5:53 AM Dan Diephouse <d...@netzooid.com> wrote:
>>>
>>>> First, let me say, Flink is super cool - thanks everyone for making my
>>>> life easier in a lot of ways! Wish I had this 10 years ago....
>>>>
>>>> Onto the fun stuff: I am attempting to use the StreamingFileSink with
>>>> S3. Note that Flink is embedded in my app, not running as a standalone
>>>> cluster.
>>>>
>>>> I am having a few problems, which I have illustrated in the small test
>>>> case below.
>>>>
>>>> 1) After my job finishes, data never gets committed to S3. Looking
>>>> through the code, I've noticed that data gets flushed to disk, but the
>>>> multi-part upload is never finished. Even though my data doesn't hit the
>>>> min part size, I would expect that if my job ends, my data should get
>>>> uploaded since the job is 100% done.
>>>>
>>>> I am also having problems when the job is running not uploading - but I
>>>> haven't been able to distill that down to a simple test case, so I thought
>>>> I'd start here.
>>>>
>>>> 2) The S3 Filesystem does not pull credentials from the Flink
>>>> Configuration when running in embedded mode. I have a workaround for this,
>>>> but it is ugly. If you comment out the line in the test case which talks
>>>> about this workaround, you will end up with a "Java.net.SocketException:
>>>> Host is down"
>>>>
>>>> Can anyone shed light on these two issues? Thanks!
>>>>
>>>> import org.apache.flink.api.common.serialization.SimpleStringEncoder;
>>>> import org.apache.flink.configuration.Configuration;
>>>> import org.apache.flink.core.fs.FileSystem;
>>>> import org.apache.flink.core.fs.Path;
>>>> import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
>>>> import
>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
>>>> import
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>> import
>>>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
>>>> import org.junit.jupiter.api.Test;
>>>>
>>>> public class S3Test {
>>>>     @Test
>>>>     public void whyDoesntThisWork() throws Exception {
>>>>         Configuration configuration = new Configuration();
>>>>         configuration.setString("state.backend",
>>>> MemoryStateBackendFactory.class.getName());
>>>>         configuration.setString("s3.access.key", "****");
>>>>         configuration.setString("s3.secret.key", "****");
>>>>
>>>>         // If I don't do this, the S3 filesystem never gets the
>>>> credentials
>>>>         FileSystem.initialize(configuration, null);
>>>>
>>>>         LocalStreamEnvironment env =
>>>> StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
>>>>
>>>>         StreamingFileSink<String> s3 = StreamingFileSink
>>>>                 .forRowFormat(new Path("s3://bucket/"), new
>>>> SimpleStringEncoder<String>())
>>>>                 .build();
>>>>
>>>>         env.fromElements("string1", "string2")
>>>>             .addSink(s3);
>>>>
>>>>         env.execute();
>>>>
>>>>         System.out.println("Done");
>>>>     }
>>>> }
>>>>
>>>>
>>>> --
>>>> Dan Diephouse
>>>> @dandiep
>>>>
>>>
>>
>> --
>> Dan Diephouse
>> @dandiep
>>
>

-- 
Dan Diephouse
@dandiep

Reply via email to