First, let me say, Flink is super cool - thanks everyone for making my life
easier in a lot of ways! Wish I had this 10 years ago....

Onto the fun stuff: I am attempting to use the StreamingFileSink with S3.
Note that Flink is embedded in my app, not running as a standalone cluster.

I am having a few problems, which I have illustrated in the small test case
below.

1) After my job finishes, data never gets committed to S3. Looking through
the code, I've noticed that data gets flushed to disk, but the multi-part
upload is never finished. Even though my data doesn't hit the min part
size, I would expect that if my job ends, my data should get uploaded since
the job is 100% done.

I am also having problems when the job is running not uploading - but I
haven't been able to distill that down to a simple test case, so I thought
I'd start here.

2) The S3 Filesystem does not pull credentials from the Flink Configuration
when running in embedded mode. I have a workaround for this, but it is
ugly. If you comment out the line in the test case which talks about this
workaround, you will end up with a "Java.net.SocketException: Host is down"

Can anyone shed light on these two issues? Thanks!

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.junit.jupiter.api.Test;

public class S3Test {
    @Test
    public void whyDoesntThisWork() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString("state.backend",
MemoryStateBackendFactory.class.getName());
        configuration.setString("s3.access.key", "****");
        configuration.setString("s3.secret.key", "****");

        // If I don't do this, the S3 filesystem never gets the credentials
        FileSystem.initialize(configuration, null);

        LocalStreamEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1, configuration);

        StreamingFileSink<String> s3 = StreamingFileSink
                .forRowFormat(new Path("s3://bucket/"), new
SimpleStringEncoder<String>())
                .build();

        env.fromElements("string1", "string2")
            .addSink(s3);

        env.execute();

        System.out.println("Done");
    }
}


-- 
Dan Diephouse
@dandiep

Reply via email to