Hi Markus, Thanks for your help. I created an environment variable in IntelliJ for FLINK_CONF_DIR to point to the flink-conf.yaml and in it defined fs.hdfs.hadoopconf to point to the core-site.xml, but when I do that, I get the error: java.io.IOException: No file system found with scheme s3, referenced in file URI 's3://flink-test/flinkoutputtest.txt'.
I have been able to get it to work by using the environment variable HADOOP_HOME to point directly to the core-site.xml, but when I do that and I push data from Kafka, I can see the message stream printed to my terminal, but no file gets saved to s3. I also don't see any errors. I have the correct AWS access id and key because i am able to read from files on s3 using Flink. My code is below: public static void main(String[] args) throws Exception { Map<String, String> configs = ConfigUtils.loadConfigs("/ path/to/src/main/resources/error-queue.yaml"); final ParameterTool parameterTool = ParameterTool.fromMap(configs); StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment(); env.getConfig().disableSysoutLogging(); env.getConfig().setGlobalJobParameters(parameterTool) DataStream<String> messageStream = env .addSource(new FlinkKafkaConsumer09<String>( parameterTool.getRequired("kafka.topic"), new SimpleStringSchema(), parameterTool.getProperties())); messageStream.print(); messageStream.writeAsText("s3://flink-test/flinkoutputtest. txt").setParallelism(1); env.execute(); On Tue, Jan 10, 2017 at 4:06 PM, M. Dale <medal...@yahoo.com> wrote: > Sam, > I just happened to answer a similar question on Stackoverflow at Does > Apache Flink AWS S3 Sink require Hadoop for local testing? > <http://stackoverflow.com/questions/41388003/does-apache-flink-aws-s3-sink-require-hadoop-for-local-testing>. > I also submitted a PR to make that (for me) a little clearer on the Apache > Flink documentation (https://github.com/apache/flink/pull/3054/files). > Does Apache Flink AWS S3 Sink require Hadoop for local testing? > I am relatively new to Apache Flink and I am trying to create a simple > project that produces a file to an AWS S3... > > <http://stackoverflow.com/questions/41388003/does-apache-flink-aws-s3-sink-require-hadoop-for-local-testing> > > Let me know if that works for you. > > Thanks, > Markus > > > On Tuesday, January 10, 2017 3:17 PM, Samra Kasim < > samra.ka...@thehumangeo.com> wrote: > > > Hi, > > I am new to Flink and I've written two small test projects: 1) to read > data from s3 and 2) to push data to s3. However, I am getting two different > errors for the projects relating to, i think, how the core-site.xml file is > being read. I am running the project locally in IntelliJ. I have the > environment variable in run configurations set to > HADOOP_HOME=path/to/dir-with-core-site.xml. I have also tried saving the > core-site.xml in the src/main/resources folder but get the same errors. I > want to know if my core-site.xml file is configured correctly for using s3a > and how to have IntelliJ read the core-site.xml file? Also, are the > core-site.xml configurations different for reading versus writing to s3? > > This is my code for reading data from s3: > > public class DesktopWriter { > > public static void main(String[] args) throws Exception { > > ExecutionEnvironment env = ExecutionEnvironment.createLoc > alEnvironment(); > DataSet<String> data = env.readTextFile("s3://flink-t > est/flink-test.txt"); > data.print(); > } > } > I get the error: Caused by: java.io.IOException: Cannot determine access > key to Amazon S3. Please make sure to configure it by setting the > configuration key 'fs.s3.accessKey'. > This is my code for writing to S3: > public class S3Sink { > public static void main(String[] args) throws Exception { > Map<String, String> configs = ConfigUtils.*loadConfigs*(“path/ > to/config.yaml"); > > final ParameterTool parameterTool = ParameterTool.*fromMap* > (configs) ; > > StreamExecutionEnvironment env = StreamExecutionEnvironment.*get > ExecutionEnvironment*(); > env.getConfig(). disableSysoutLogging(); > env.getConfig(). setGlobalJobParameters( parameterTool); > > DataStream<String> messageStream = env > .addSource(new FlinkKafkaConsumer09<String>( > parameterTool.getRequired(" kafka.topic"), > new SimpleStringSchema(), > parameterTool.getProperties()) ); > > messageStream.writeAsText(" s3a://flink-test/flinktest.txt" > ).setParallelism(1); > > env.execute(); > } > I get the error: Caused by: java.io.IOException: The given file URI > (s3://flink-test/flinktest.txt) points to the HDFS NameNode at > flink-test, but the File System could not be initialized with that address: > Unable to load AWS credentials from any provider in the chain > > This is my core-site.xml: > <configuration> > <property> > <name>fs.defaultFS</name> > <value>hdfs://localhost:9000</ value> > </property> > <property> > <name>fs.s3.impl</name> > <value>org.apache.hadoop.fs. s3a.S3AFileSystem</value> > </property> > > <!-- Comma separated list of local directories used to buffer > large results prior to transmitting them to S3. --> > <property> > <name>fs.s3a.buffer.dir</name> > <value>/tmp</value> > </property> > > <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a. > Constants --> > <property> > <name>fs.s3a.awsAccessKeyId</ name> > <value>*****</value> > </property> > > <!-- set your AWS access key --> > <property> > <name>fs.s3a. awsSecretAccessKey</name> > <value>*****</value> > </property> > </configuration> > This is my pom.xml: > <dependencies> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-java</artifactId> > <version>1.1.4</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-java_2.10</artifactId> > <version>1.1.4</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-clients_2.10</artifactId> > <version>1.1.4</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka-0.9_2.10</artifactId> > <version>1.1.4</version> > </dependency> > > <dependency> > <groupId>com.amazonaws</groupId> > <artifactId>aws-java-sdk</artifactId> > <version>1.7.4</version> > </dependency> > > <dependency> > <groupId>org.apache.hadoop</groupId> > <artifactId>hadoop-aws</artifactId> > <version>2.7.2</version> > </dependency> > > <dependency> > <groupId>org.apache.httpcomponents</groupId> > <artifactId>httpclient</artifactId> > <version>4.2.5</version> > </dependency> > <dependency> > <groupId>org.apache.httpcomponents</groupId> > <artifactId>httpcore</artifactId> > <version>4.2.5</version> > </dependency> > </dependencies> > > Thanks! > Sam > > >