Hi Markus,

Thanks for your help. I created an environment variable in IntelliJ for
FLINK_CONF_DIR to point to the flink-conf.yaml and in it defined
fs.hdfs.hadoopconf to point to the core-site.xml, but when I do that, I get
the error: java.io.IOException: No file system found with scheme s3,
referenced in file URI 's3://flink-test/flinkoutputtest.txt'.

I have been able to get it to work by using the environment variable
HADOOP_HOME to point directly to the core-site.xml, but when I do that and
I push data from Kafka, I can see the message stream printed to my
terminal, but no file gets saved to s3. I also don't see any errors. I have
the correct AWS access id and key because i am able to read from files on
s3 using Flink.

My code is below:

    public static void main(String[] args) throws Exception {

        Map<String, String> configs = ConfigUtils.loadConfigs("/
path/to/src/main/resources/error-queue.yaml");



        final ParameterTool parameterTool = ParameterTool.fromMap(configs);



        StreamExecutionEnvironment env = StreamExecutionEnvironment.
getExecutionEnvironment();

        env.getConfig().disableSysoutLogging();

        env.getConfig().setGlobalJobParameters(parameterTool)



        DataStream<String> messageStream = env

                .addSource(new FlinkKafkaConsumer09<String>(

                        parameterTool.getRequired("kafka.topic"),

                        new SimpleStringSchema(),

                        parameterTool.getProperties()));



        messageStream.print();

        messageStream.writeAsText("s3://flink-test/flinkoutputtest.
txt").setParallelism(1);



        env.execute();

On Tue, Jan 10, 2017 at 4:06 PM, M. Dale <medal...@yahoo.com> wrote:

> Sam,
>   I just happened to answer a similar question on Stackoverflow at Does
> Apache Flink AWS S3 Sink require Hadoop for local testing?
> <http://stackoverflow.com/questions/41388003/does-apache-flink-aws-s3-sink-require-hadoop-for-local-testing>.
> I also submitted a PR to make that (for me) a little clearer on the Apache
> Flink documentation (https://github.com/apache/flink/pull/3054/files).
> Does Apache Flink AWS S3 Sink require Hadoop for local testing?
> I am relatively new to Apache Flink and I am trying to create a simple
> project that produces a file to an AWS S3...
>
> <http://stackoverflow.com/questions/41388003/does-apache-flink-aws-s3-sink-require-hadoop-for-local-testing>
>
> Let me know if that works for you.
>
> Thanks,
> Markus
>
>
> On Tuesday, January 10, 2017 3:17 PM, Samra Kasim <
> samra.ka...@thehumangeo.com> wrote:
>
>
> Hi,
>
> I am new to Flink and I've written two small test projects: 1) to read
> data from s3 and 2) to push data to s3. However, I am getting two different
> errors for the projects relating to, i think, how the core-site.xml file is
> being read. I am running the project locally in IntelliJ. I have the
> environment variable in run configurations set to
> HADOOP_HOME=path/to/dir-with-core-site.xml. I have also tried saving the
> core-site.xml in the src/main/resources folder but get the same errors. I
> want to know if my core-site.xml file is configured correctly for using s3a
> and how to have IntelliJ read the core-site.xml file? Also, are the
> core-site.xml configurations different for reading versus writing to s3?
>
> This is my code for reading data from s3:
>
> public class DesktopWriter {
>
>     public static void main(String[] args) throws Exception {
>
>         ExecutionEnvironment env = ExecutionEnvironment.createLoc
> alEnvironment();
>         DataSet<String> data = env.readTextFile("s3://flink-t
> est/flink-test.txt");
>         data.print();
>     }
> }
> I get the error: Caused by: java.io.IOException: Cannot determine access
> key to Amazon S3. Please make sure to configure it by setting the
> configuration key 'fs.s3.accessKey'.
> This is my code for writing to S3:
> public class S3Sink {
>     public static void main(String[] args) throws Exception {
>         Map<String, String> configs = ConfigUtils.*loadConfigs*(“path/
> to/config.yaml");
>
>         final ParameterTool parameterTool = ParameterTool.*fromMap*
> (configs) ;
>
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.*get
> ExecutionEnvironment*();
>         env.getConfig(). disableSysoutLogging();
>         env.getConfig(). setGlobalJobParameters( parameterTool);
>
>         DataStream<String> messageStream = env
>                 .addSource(new FlinkKafkaConsumer09<String>(
>                         parameterTool.getRequired(" kafka.topic"),
>                         new SimpleStringSchema(),
>                         parameterTool.getProperties()) );
>
>         messageStream.writeAsText(" s3a://flink-test/flinktest.txt"
> ).setParallelism(1);
>
>         env.execute();
>     }
> I get the error: Caused by: java.io.IOException: The given file URI
> (s3://flink-test/flinktest.txt) points to the HDFS NameNode at
> flink-test, but the File System could not be initialized with that address:
> Unable to load AWS credentials from any provider in the chain
>
> This is my core-site.xml:
> <configuration>
>     <property>
>         <name>fs.defaultFS</name>
>         <value>hdfs://localhost:9000</ value>
>     </property>
>     <property>
>         <name>fs.s3.impl</name>
>         <value>org.apache.hadoop.fs. s3a.S3AFileSystem</value>
>     </property>
>
>     <!-- Comma separated list of local directories used to buffer
>          large results prior to transmitting them to S3. -->
>     <property>
>         <name>fs.s3a.buffer.dir</name>
>         <value>/tmp</value>
>     </property>
>
>     <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.
> Constants -->
>     <property>
>         <name>fs.s3a.awsAccessKeyId</ name>
>         <value>*****</value>
>     </property>
>
>     <!-- set your AWS access key -->
>     <property>
>         <name>fs.s3a. awsSecretAccessKey</name>
>         <value>*****</value>
>     </property>
> </configuration>
> This is my pom.xml:
> <dependencies>
>     <dependency>
>         <groupId>org.apache.flink</groupId>
>         <artifactId>flink-java</artifactId>
>         <version>1.1.4</version>
>     </dependency>
>
>     <dependency>
>         <groupId>org.apache.flink</groupId>
>         <artifactId>flink-streaming-java_2.10</artifactId>
>         <version>1.1.4</version>
>     </dependency>
>
>     <dependency>
>         <groupId>org.apache.flink</groupId>
>         <artifactId>flink-clients_2.10</artifactId>
>         <version>1.1.4</version>
>     </dependency>
>
>     <dependency>
>         <groupId>org.apache.flink</groupId>
>         <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>         <version>1.1.4</version>
>     </dependency>
>
>     <dependency>
>         <groupId>com.amazonaws</groupId>
>         <artifactId>aws-java-sdk</artifactId>
>         <version>1.7.4</version>
>     </dependency>
>
>     <dependency>
>         <groupId>org.apache.hadoop</groupId>
>         <artifactId>hadoop-aws</artifactId>
>         <version>2.7.2</version>
>     </dependency>
>
>     <dependency>
>         <groupId>org.apache.httpcomponents</groupId>
>         <artifactId>httpclient</artifactId>
>         <version>4.2.5</version>
>     </dependency>
>     <dependency>
>         <groupId>org.apache.httpcomponents</groupId>
>         <artifactId>httpcore</artifactId>
>         <version>4.2.5</version>
>     </dependency>
> </dependencies>
>
> Thanks!
> Sam
>
>
>

Reply via email to