Sam, Don't point the variables at files, point them at the directories containing the files. Do you have fs.s3.impl property defined? Concrete example: /home/markus/hadoop-config directory has one file "core-site.xml" with thefollowing content: <configuration> <property> <name>fs.s3.impl</name> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> </property> <!-- Comma separated list of local directories used to buffer large results prior to transmitting them to S3. --> <property> <name>fs.s3a.buffer.dir</name> <value>/tmp</value> </property> <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants --> <property> <name>fs.s3a.access.key</name> <value>YOUR_ACCESS_KEY</value> </property> <!-- set your AWS access key --> <property> <name>fs.s3a.secret.key</name> <value>YOUR_SECRET_KEY</value> </property></configuration> /home/markus/flink-config directory has one file "flink-conf.yaml" with the following content point hadoopconf to the DIRECTORY containing core-site.xml: fs.hdfs.hadoopconf: /home/markus/hadoop-config In IntelliJ, go to Run - Edit Configurations - <your run configuration> andset the FLINK_CONF_DIR environment variable to point to the directory containingflink-conf.yaml (i.e in my case /home/markus/flink-config). So everything is pointing to directories where the code looks for well-known filenames. With that, the following works to write to S3. (Maybe load events from collection at first): events.writeAsText("s3://<bucket>/<prefix-dir>")
env.execute On Wednesday, January 11, 2017 10:44 AM, Samra Kasim <samra.ka...@thehumangeo.com> wrote: Hi Markus, Thanks for your help. I created an environment variable in IntelliJ for FLINK_CONF_DIR to point to the flink-conf.yaml and in it defined fs.hdfs.hadoopconf to point to the core-site.xml, but when I do that, I get the error: java.io.IOException: No file system found with scheme s3, referenced in file URI 's3://flink-test/ flinkoutputtest.txt'. I have been able to get it to work by using the environment variable HADOOP_HOME to point directly to the core-site.xml, but when I do that and I push data from Kafka, I can see the message stream printed to my terminal, but no file gets saved to s3. I also don't see any errors. I have the correct AWS access id and key because i am able to read from files on s3 using Flink. My code is below: public static voidmain(String[] args) throws Exception { Map<String,String> configs = ConfigUtils.loadConfigs("/ path/to/src/main/resources/ error-queue.yaml"); finalParameterTool parameterTool = ParameterTool.fromMap(configs) ; StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment(); env.getConfig(). disableSysoutLogging(); env.getConfig(). setGlobalJobParameters( parameterTool) DataStream<String> messageStream = env .addSource(new FlinkKafkaConsumer09<String>( parameterTool.getRequired(" kafka.topic"), new SimpleStringSchema(), parameterTool.getProperties()) ); messageStream.print(); messageStream.writeAsText("s3: //flink-test/flinkoutputtest. txt").setParallelism(1); env.execute(); On Tue, Jan 10, 2017 at 4:06 PM, M. Dale <medal...@yahoo.com> wrote: Sam, I just happened to answer a similar question on Stackoverflow at Does Apache Flink AWS S3 Sink require Hadoop for local testing?. I also submitted a PR to make that (for me) a little clearer on the Apache Flink documentation (https://github.com/apache/fli nk/pull/3054/files). | | | | | | | | | | | Does Apache Flink AWS S3 Sink require Hadoop for local testing? I am relatively new to Apache Flink and I am trying to create a simple project that produces a file to an AWS S3... | | | | Let me know if that works for you. Thanks,Markus On Tuesday, January 10, 2017 3:17 PM, Samra Kasim <samra.ka...@thehumangeo.com> wrote: Hi, I am new to Flink and I've written two small test projects: 1) to read data from s3 and 2) to push data to s3. However, I am getting two different errors for the projects relating to, i think, how the core-site.xml file is being read. I am running the project locally in IntelliJ. I have the environment variable in run configurations set to HADOOP_HOME=path/to/dir-with-c ore-site.xml. I have also tried saving the core-site.xml in the src/main/resources folder but get the same errors. I want to know if my core-site.xml file is configured correctly for using s3a and how to have IntelliJ read the core-site.xml file? Also, are the core-site.xml configurations different for reading versus writing to s3? This is my code for reading data from s3: public class DesktopWriter { public static voidmain(String[] args) throws Exception { ExecutionEnvironment env =ExecutionEnvironment.createLoc alEnvironment(); DataSet<String> data = env.readTextFile("s3://flink-t est/flink-test.txt"); data.print(); }}I get the error: Caused by: java.io.IOException: Cannot determine access key to Amazon S3. Please make sure to configure it by setting the configuration key 'fs.s3.accessKey'.This is my code for writing to S3:public class S3Sink { public static void main(String[] args) throws Exception { Map<String, String> configs = ConfigUtils.loadConfigs(“path/ to/config.yaml"); final ParameterTool parameterTool = ParameterTool.fromMap(configs) ; StreamExecutionEnvironment env = StreamExecutionEnvironment.get ExecutionEnvironment(); env.getConfig(). disableSysoutLogging(); env.getConfig(). setGlobalJobParameters( parameterTool); DataStream<String> messageStream = env .addSource(new FlinkKafkaConsumer09<String>( parameterTool.getRequired(" kafka.topic"), new SimpleStringSchema(), parameterTool.getProperties()) ); messageStream.writeAsText(" s3a://flink-test/flinktest.txt ").setParallelism(1); env.execute(); }I get the error: Caused by: java.io.IOException: The given file URI (s3://flink-test/flinktest.txt ) points to the HDFS NameNode at flink-test, but the File System could not be initialized with that address: Unable to load AWS credentials from any provider in the chain This is my core-site.xml: <configuration> <property> <name>fs.defaultFS</name> <value>hdfs://localhost:9000</ value> </property> <property> <name>fs.s3.impl</name> <value>org.apache.hadoop.fs. s3a.S3AFileSystem</value> </property> <!-- Comma separated list of local directories used to buffer large results prior to transmitting them to S3. --> <property> <name>fs.s3a.buffer.dir</name> <value>/tmp</value> </property> <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a. Constants --> <property> <name>fs.s3a.awsAccessKeyId</ name> <value>*****</value> </property> <!-- set your AWS access key --> <property> <name>fs.s3a. awsSecretAccessKey</name> <value>*****</value> </property></configuration>This is my pom.xml:<dependencies> <dependency> <groupId>org.apache.flink</gro upId> <artifactId>flink-java</artifa ctId> <version>1.1.4</version> </dependency> <dependency> <groupId>org.apache.flink</gro upId> <artifactId>flink-streaming-ja va_2.10</artifactId> <version>1.1.4</version> </dependency> <dependency> <groupId>org.apache.flink</gro upId> <artifactId>flink-clients_2.10 </artifactId> <version>1.1.4</version> </dependency> <dependency> <groupId>org.apache.flink</gro upId> <artifactId>flink-connector-ka fka-0.9_2.10</artifactId> <version>1.1.4</version> </dependency> <dependency> <groupId>com.amazonaws</groupI d> <artifactId>aws-java-sdk </artifactId> <version>1.7.4</version> </dependency> <dependency> <groupId>org.apache.hadoop</gr oupId> <artifactId>hadoop-aws</artifa ctId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.httpcompon ents</groupId> <artifactId>httpclient</artifa ctId> <version>4.2.5</version> </dependency> <dependency> <groupId>org.apache.httpcompon ents</groupId> <artifactId>httpcore</artifact Id> <version>4.2.5</version> </dependency></dependencies> Thanks!Sam