> Hi Flink Experts, > I am trying to read an S3 file from my Intellij using Flink I am.comimg > across Aws Auth error can someone help below are all the details. >
> I have Aws credentials in homefolder/.aws/credentials > My Intellij Environment Variables:- > ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1 > > FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config > > flink-conf.yaml file content:- > > fs.hdfs.hadoopconf: > /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config > > core-site.xml file content:- > > <?xml version="1.0"?> > <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> > > <configuration> > <property> > <name>fs.s3.impl</name> > <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> > </property> > > <property> > <name>fs.s3.buffer.dir</name> > <value>/tmp</value> > </property> > > <property> > <name>fs.s3a.server-side-encryption-algorithm</name> > <value>AES256</value> > </property> > > <!--<property> > <name>fs.s3a.aws.credentials.provider</name> > > <value>org.apache.hadoop.fs.s3a.SharedInstanceProfileCredentialsProvider</value> > </property>--> > > <property> > <name>fs.s3a.aws.credentials.provider</name> > <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value> > </property> > <property> > <name>fs.s3a.access.key</name> > <value></value> > </property> > <property> > <name>fs.s3a.secret.key</name> > <value></value> > </property> > <property> > <name>fs.s3a.session.token</name> > <value></value> > </property> > > <property> > <name>fs.s3a.proxy.host</name> > <value></value> > </property> > <property> > <name>fs.s3a.proxy.port</name> > <value>8099</value> > </property> > <property> > <name>fs.s3a.proxy.username</name> > <value></value> > </property> > <property> > <name>fs.s3a.proxy.password</name> > <value></value> > </property> > > </configuration> > > POM.xml file:- > > <?xml version="1.0" encoding="UTF-8"?> > <project xmlns="http://maven.apache.org/POM/4.0.0" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 > http://maven.apache.org/xsd/maven-4.0.0.xsd"> > <modelVersion>4.0.0</modelVersion> > > <groupId>FlinkStreamAndSql</groupId> > <artifactId>FlinkStreamAndSql</artifactId> > <version>1.0-SNAPSHOT</version> > <build> > <sourceDirectory>src/main/scala</sourceDirectory> > <plugins> > <plugin> > <!-- see http://davidb.github.com/scala-maven-plugin --> > <groupId>net.alchim31.maven</groupId> > <artifactId>scala-maven-plugin</artifactId> > <version>3.1.3</version> > <executions> > <execution> > <goals> > <goal>compile</goal> > <goal>testCompile</goal> > </goals> > <configuration> > </configuration> > </execution> > </executions> > </plugin> > <plugin> > <groupId>org.apache.maven.plugins</groupId> > <artifactId>maven-surefire-plugin</artifactId> > <version>2.13</version> > <configuration> > <useFile>false</useFile> > <disableXmlReport>true</disableXmlReport> > <!-- If you have classpath issue like NoDefClassError,... > --> > <!-- useManifestOnlyJar>false</useManifestOnlyJar --> > <includes> > <include>**/*Test.*</include> > <include>**/*Suite.*</include> > </includes> > </configuration> > </plugin> > > <!-- "package" command plugin --> > <plugin> > <artifactId>maven-assembly-plugin</artifactId> > <version>2.4.1</version> > <configuration> > <descriptorRefs> > <descriptorRef>jar-with-dependencies</descriptorRef> > </descriptorRefs> > </configuration> > <executions> > <execution> > <id>make-assembly</id> > <phase>package</phase> > <goals> > <goal>single</goal> > </goals> > </execution> > </executions> > </plugin> > </plugins> > </build> > <dependencies> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-core</artifactId> > <version>1.8.1</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-core</artifactId> > <version>1.8.1</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-clients_2.11</artifactId> > <version>1.8.1</version> > </dependency> > > <dependency> > <groupId>org.apache.derby</groupId> > <artifactId>derby</artifactId> > <version>10.13.1.1</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-jdbc_2.11</artifactId> > <version>1.8.1</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-api-scala_2.11</artifactId> > <version>1.8.1</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-api-java</artifactId> > <version>1.8.1</version> > </dependency> > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table</artifactId> > <version>1.8.1</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-planner_2.11</artifactId> > <version>1.8.1</version> > </dependency> > > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-json</artifactId> > <version>1.8.1</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-scala_2.11</artifactId> > <version>1.8.1</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-scala_2.11</artifactId> > <version>1.8.1</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-scala_2.11</artifactId> > <version>1.8.1</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kinesis_2.11</artifactId> > <version>1.8.0</version> > <scope>system</scope> > > <systemPath>${project.basedir}/Jars/flink-connector-kinesis_2.11-1.8-SNAPSHOT.jar</systemPath> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka-0.11_2.11</artifactId> > <version>1.8.1</version> > </dependency> > > <dependency> > <groupId>com.amazonaws</groupId> > <artifactId>amazon-kinesis-client</artifactId> > <version>1.8.8</version> > </dependency> > > <dependency> > <groupId>com.amazonaws</groupId> > <artifactId>aws-java-sdk-kinesis</artifactId> > <version>1.11.579</version> > </dependency> > > <dependency> > <groupId>commons-dbcp</groupId> > <artifactId>commons-dbcp</artifactId> > <version>1.2.2</version> > </dependency> > <dependency> > <groupId>com.google.code.gson</groupId> > <artifactId>gson</artifactId> > <version>2.1</version> > </dependency> > > <dependency> > <groupId>commons-cli</groupId> > <artifactId>commons-cli</artifactId> > <version>1.4</version> > </dependency> > > <!-- > https://mvnrepository.com/artifact/org.apache.commons/commons-csv --> > <dependency> > <groupId>org.apache.commons</groupId> > <artifactId>commons-csv</artifactId> > <version>1.7</version> > </dependency> > > <dependency> > <groupId>org.apache.commons</groupId> > <artifactId>commons-compress</artifactId> > <version>1.4.1</version> > </dependency> > > <dependency> > <groupId>com.amazonaws</groupId> > <artifactId>dynamodb-streams-kinesis-adapter</artifactId> > <version>1.4.0</version> > </dependency> > > <dependency> > <groupId>com.amazonaws</groupId> > <artifactId>dynamodb-streams-kinesis-adapter</artifactId> > <version>1.4.0</version> > </dependency> > > <dependency> > <groupId>com.amazonaws</groupId> > <artifactId>aws-java-sdk</artifactId> > <version>1.11.579</version> > </dependency> > > > <!-- For Parquet --> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-hadoop-compatibility_2.11</artifactId> > <version>1.8.1</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-avro</artifactId> > <version>1.8.1</version> > </dependency> > <dependency> > <groupId>org.apache.parquet</groupId> > <artifactId>parquet-avro</artifactId> > <version>1.10.0</version> > </dependency> > <dependency> > <groupId>org.apache.hadoop</groupId> > <artifactId>hadoop-mapreduce-client-core</artifactId> > <version>3.1.1</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-twitter_2.10</artifactId> > <version>1.1.4-hadoop1</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-filesystem_2.11</artifactId> > <version>1.8.1</version> > </dependency> > > <dependency> > <groupId>org.json4s</groupId> > <artifactId>json4s-jackson_2.11</artifactId> > <version>3.6.7</version> > </dependency> > > <dependency> > <groupId>com.amazonaws</groupId> > <artifactId>aws-java-sdk-cloudsearch</artifactId> > <version>1.11.500</version> > </dependency> > > <!-- > https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2 --> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-shaded-hadoop2</artifactId> > <version>2.8.3-1.8.3</version> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-s3-fs-hadoop</artifactId> > <version>1.8.1</version> > </dependency> > > <dependency> > <groupId>org.apache.hadoop</groupId> > <artifactId>hadoop-common</artifactId> > <version>2.8.5</version> > </dependency> > > > </dependencies> > > </project> > > Scala Code:- > > package com.aws.examples.s3 > > > import org.apache.flink.api.common.typeinfo.Types > import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} > import org.apache.flink.table.api.{Table, TableEnvironment} > import org.apache.flink.table.api.java.BatchTableEnvironment > import org.apache.flink.table.sources.CsvTableSource > > object Batch { > > def main(args: Array[String]): Unit = { > > val env: ExecutionEnvironment = > ExecutionEnvironment.getExecutionEnvironment > val tableEnv: BatchTableEnvironment = > TableEnvironment.getTableEnvironment(env) > /* create table from csv */ > > val tableSrc = CsvTableSource > .builder() > .path("s3a://bucket/csvfolder/avg.txt") > .fieldDelimiter(",") > .field("date", Types.STRING) > .field("month", Types.STRING) > .field("category", Types.STRING) > .field("product", Types.STRING) > .field("profit", Types.INT) > .build() > > tableEnv.registerTableSource("CatalogTable", tableSrc) > > val catalog: Table = tableEnv.scan("CatalogTable") > /* querying with Table API */ > > val order20: Table = catalog > .filter(" category === 'Category5'") > .groupBy("month") > .select("month, profit.sum as sum") > .orderBy("sum") > > val order20Set: DataSet[Row1] = tableEnv.toDataSet(order20, classOf[Row1]) > > order20Set.writeAsText("src/main/resources/table1/table1") > > //tableEnv.toAppendStream(order20, > classOf[Row]).writeAsText("/home/jivesh/table") > env.execute("State") > > } > > class Row1 { > > var month: String = _ > > var sum: java.lang.Integer = _ > > override def toString(): String = month + "," + sum > > } > > } > > Error:- > *Caused by: > org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable > to load credentials from service endpoint* > > *Caused by: > org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException: No > AWS Credentials provided by BasicAWSCredentialsProvider > EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : > org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable > to load credentials from service endpoint* > > > Thanks > > ------------------------------ > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the intended > recipient, you are hereby notified that any review, retransmission, > dissemination, distribution, copying or other use of, or taking of any > action in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the sender and > delete the material from your computer. > > > > > -- Thanks & Regards Sri Tummala