FYI - I discovered that if I specify the Hadoop compression codec it works fine. E.g.:
CompressWriters.forExtractor(new DefaultExtractor()).withHadoopCompression("GzipCodec") Haven't dug into exactly why yet. On Wed, Oct 7, 2020 at 12:14 PM David Anderson <da...@alpinegizmo.com> wrote: > Looping in @Kostas Kloudas <kklou...@apache.org> who should be able to > clarify things. > > David > > On Wed, Oct 7, 2020 at 7:12 PM Dan Diephouse <d...@netzooid.com> wrote: > >> Thanks! Completely missed that in the docs. It's now working, however >> it's not working with compression writers. Someone else noted this issue >> here: >> >> >> https://stackoverflow.com/questions/62138635/flink-streaming-compression-not-working-using-amazon-aws-s3-connector-streaming >> >> Looking at the code, I'm not sure I follow the nuances of why sync() >> doesn't just do a call to flush in RefCountedBufferingFileStream: >> >> public void sync() throws IOException { >> throw new UnsupportedOperationException("S3RecoverableFsDataOutputStream >> cannot sync state to S3. " + >> "Use persist() to create a persistent recoverable intermediate point."); >> } >> >> If there are any pointers here on what should happen, happy to submit a >> patch. >> >> >> >> >> On Wed, Oct 7, 2020 at 1:37 AM David Anderson <da...@alpinegizmo.com> >> wrote: >> >>> Dan, >>> >>> The first point you've raised is a known issue: When a job is stopped, >>> the unfinished part files are not transitioned to the finished state. This >>> is mentioned in the docs as Important Note 2 [1], and fixing this is >>> waiting on FLIP-46 [2]. That section of the docs also includes some >>> S3-specific warnings, but nothing pertaining to managing credentials. >>> Perhaps [3] will help. >>> >>> Regards, >>> David >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html#general >>> [2] >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs >>> [3] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/filesystems/s3.html#configure-access-credentials >>> >>> >>> On Wed, Oct 7, 2020 at 5:53 AM Dan Diephouse <d...@netzooid.com> wrote: >>> >>>> First, let me say, Flink is super cool - thanks everyone for making my >>>> life easier in a lot of ways! Wish I had this 10 years ago.... >>>> >>>> Onto the fun stuff: I am attempting to use the StreamingFileSink with >>>> S3. Note that Flink is embedded in my app, not running as a standalone >>>> cluster. >>>> >>>> I am having a few problems, which I have illustrated in the small test >>>> case below. >>>> >>>> 1) After my job finishes, data never gets committed to S3. Looking >>>> through the code, I've noticed that data gets flushed to disk, but the >>>> multi-part upload is never finished. Even though my data doesn't hit the >>>> min part size, I would expect that if my job ends, my data should get >>>> uploaded since the job is 100% done. >>>> >>>> I am also having problems when the job is running not uploading - but I >>>> haven't been able to distill that down to a simple test case, so I thought >>>> I'd start here. >>>> >>>> 2) The S3 Filesystem does not pull credentials from the Flink >>>> Configuration when running in embedded mode. I have a workaround for this, >>>> but it is ugly. If you comment out the line in the test case which talks >>>> about this workaround, you will end up with a "Java.net.SocketException: >>>> Host is down" >>>> >>>> Can anyone shed light on these two issues? Thanks! >>>> >>>> import org.apache.flink.api.common.serialization.SimpleStringEncoder; >>>> import org.apache.flink.configuration.Configuration; >>>> import org.apache.flink.core.fs.FileSystem; >>>> import org.apache.flink.core.fs.Path; >>>> import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory; >>>> import >>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment; >>>> import >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>> import >>>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; >>>> import org.junit.jupiter.api.Test; >>>> >>>> public class S3Test { >>>> @Test >>>> public void whyDoesntThisWork() throws Exception { >>>> Configuration configuration = new Configuration(); >>>> configuration.setString("state.backend", >>>> MemoryStateBackendFactory.class.getName()); >>>> configuration.setString("s3.access.key", "****"); >>>> configuration.setString("s3.secret.key", "****"); >>>> >>>> // If I don't do this, the S3 filesystem never gets the >>>> credentials >>>> FileSystem.initialize(configuration, null); >>>> >>>> LocalStreamEnvironment env = >>>> StreamExecutionEnvironment.createLocalEnvironment(1, configuration); >>>> >>>> StreamingFileSink<String> s3 = StreamingFileSink >>>> .forRowFormat(new Path("s3://bucket/"), new >>>> SimpleStringEncoder<String>()) >>>> .build(); >>>> >>>> env.fromElements("string1", "string2") >>>> .addSink(s3); >>>> >>>> env.execute(); >>>> >>>> System.out.println("Done"); >>>> } >>>> } >>>> >>>> >>>> -- >>>> Dan Diephouse >>>> @dandiep >>>> >>> >> >> -- >> Dan Diephouse >> @dandiep >> > -- Dan Diephouse @dandiep