Flink, I am able to access Kinesis from Intellij but not S3 I have edited my stack overflow question with kinesis code , Flink is still having issues reading S3.
https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868 Thanks Sri On Tue, Mar 9, 2021 at 11:30 AM sri hari kali charan Tummala < kali.tumm...@gmail.com> wrote: > my stack overflow question. > > > https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868 > > On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> Here is my Intellij question. >> >> >> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868 >> >> On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala < >> kali.tumm...@gmail.com> wrote: >> >>> >>> Hi Flink Experts, >>>> >>> >>> I am trying to read an S3 file from my Intellij using Flink I am.comimg >>>> across Aws Auth error can someone help below are all the details. >>>> >>> >>> >>>> I have Aws credentials in homefolder/.aws/credentials >>>> >>> >>> My Intellij Environment Variables:- >>>> ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1 >>>> >>>> FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config >>>> >>>> flink-conf.yaml file content:- >>>> >>>> fs.hdfs.hadoopconf: >>>> /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config >>>> >>>> core-site.xml file content:- >>>> >>>> <?xml version="1.0"?> >>>> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> >>>> >>>> <configuration> >>>> <property> >>>> <name>fs.s3.impl</name> >>>> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> >>>> </property> >>>> >>>> <property> >>>> <name>fs.s3.buffer.dir</name> >>>> <value>/tmp</value> >>>> </property> >>>> >>>> <property> >>>> <name>fs.s3a.server-side-encryption-algorithm</name> >>>> <value>AES256</value> >>>> </property> >>>> >>>> <!--<property> >>>> <name>fs.s3a.aws.credentials.provider</name> >>>> >>>> <value>org.apache.hadoop.fs.s3a.SharedInstanceProfileCredentialsProvider</value> >>>> </property>--> >>>> >>>> <property> >>>> <name>fs.s3a.aws.credentials.provider</name> >>>> >>>> <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value> >>>> </property> >>>> <property> >>>> <name>fs.s3a.access.key</name> >>>> <value></value> >>>> </property> >>>> <property> >>>> <name>fs.s3a.secret.key</name> >>>> <value></value> >>>> </property> >>>> <property> >>>> <name>fs.s3a.session.token</name> >>>> <value></value> >>>> </property> >>>> >>>> <property> >>>> <name>fs.s3a.proxy.host</name> >>>> <value></value> >>>> </property> >>>> <property> >>>> <name>fs.s3a.proxy.port</name> >>>> <value>8099</value> >>>> </property> >>>> <property> >>>> <name>fs.s3a.proxy.username</name> >>>> <value></value> >>>> </property> >>>> <property> >>>> <name>fs.s3a.proxy.password</name> >>>> <value></value> >>>> </property> >>>> >>>> </configuration> >>>> >>>> POM.xml file:- >>>> >>>> <?xml version="1.0" encoding="UTF-8"?> >>>> <project xmlns="http://maven.apache.org/POM/4.0.0" >>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >>>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >>>> http://maven.apache.org/xsd/maven-4.0.0.xsd"> >>>> <modelVersion>4.0.0</modelVersion> >>>> >>>> <groupId>FlinkStreamAndSql</groupId> >>>> <artifactId>FlinkStreamAndSql</artifactId> >>>> <version>1.0-SNAPSHOT</version> >>>> <build> >>>> <sourceDirectory>src/main/scala</sourceDirectory> >>>> <plugins> >>>> <plugin> >>>> <!-- see http://davidb.github.com/scala-maven-plugin --> >>>> <groupId>net.alchim31.maven</groupId> >>>> <artifactId>scala-maven-plugin</artifactId> >>>> <version>3.1.3</version> >>>> <executions> >>>> <execution> >>>> <goals> >>>> <goal>compile</goal> >>>> <goal>testCompile</goal> >>>> </goals> >>>> <configuration> >>>> </configuration> >>>> </execution> >>>> </executions> >>>> </plugin> >>>> <plugin> >>>> <groupId>org.apache.maven.plugins</groupId> >>>> <artifactId>maven-surefire-plugin</artifactId> >>>> <version>2.13</version> >>>> <configuration> >>>> <useFile>false</useFile> >>>> <disableXmlReport>true</disableXmlReport> >>>> <!-- If you have classpath issue like >>>> NoDefClassError,... --> >>>> <!-- useManifestOnlyJar>false</useManifestOnlyJar --> >>>> <includes> >>>> <include>**/*Test.*</include> >>>> <include>**/*Suite.*</include> >>>> </includes> >>>> </configuration> >>>> </plugin> >>>> >>>> <!-- "package" command plugin --> >>>> <plugin> >>>> <artifactId>maven-assembly-plugin</artifactId> >>>> <version>2.4.1</version> >>>> <configuration> >>>> <descriptorRefs> >>>> >>>> <descriptorRef>jar-with-dependencies</descriptorRef> >>>> </descriptorRefs> >>>> </configuration> >>>> <executions> >>>> <execution> >>>> <id>make-assembly</id> >>>> <phase>package</phase> >>>> <goals> >>>> <goal>single</goal> >>>> </goals> >>>> </execution> >>>> </executions> >>>> </plugin> >>>> </plugins> >>>> </build> >>>> <dependencies> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-core</artifactId> >>>> <version>1.8.1</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-core</artifactId> >>>> <version>1.8.1</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-clients_2.11</artifactId> >>>> <version>1.8.1</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>org.apache.derby</groupId> >>>> <artifactId>derby</artifactId> >>>> <version>10.13.1.1</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-jdbc_2.11</artifactId> >>>> <version>1.8.1</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-table-api-scala_2.11</artifactId> >>>> <version>1.8.1</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-table-api-java</artifactId> >>>> <version>1.8.1</version> >>>> </dependency> >>>> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-table</artifactId> >>>> <version>1.8.1</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-table-planner_2.11</artifactId> >>>> <version>1.8.1</version> >>>> </dependency> >>>> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-json</artifactId> >>>> <version>1.8.1</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-scala_2.11</artifactId> >>>> <version>1.8.1</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-scala_2.11</artifactId> >>>> <version>1.8.1</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-streaming-scala_2.11</artifactId> >>>> <version>1.8.1</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-connector-kinesis_2.11</artifactId> >>>> <version>1.8.0</version> >>>> <scope>system</scope> >>>> >>>> <systemPath>${project.basedir}/Jars/flink-connector-kinesis_2.11-1.8-SNAPSHOT.jar</systemPath> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> >>>> <version>1.8.1</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>com.amazonaws</groupId> >>>> <artifactId>amazon-kinesis-client</artifactId> >>>> <version>1.8.8</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>com.amazonaws</groupId> >>>> <artifactId>aws-java-sdk-kinesis</artifactId> >>>> <version>1.11.579</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>commons-dbcp</groupId> >>>> <artifactId>commons-dbcp</artifactId> >>>> <version>1.2.2</version> >>>> </dependency> >>>> <dependency> >>>> <groupId>com.google.code.gson</groupId> >>>> <artifactId>gson</artifactId> >>>> <version>2.1</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>commons-cli</groupId> >>>> <artifactId>commons-cli</artifactId> >>>> <version>1.4</version> >>>> </dependency> >>>> >>>> <!-- >>>> https://mvnrepository.com/artifact/org.apache.commons/commons-csv --> >>>> <dependency> >>>> <groupId>org.apache.commons</groupId> >>>> <artifactId>commons-csv</artifactId> >>>> <version>1.7</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>org.apache.commons</groupId> >>>> <artifactId>commons-compress</artifactId> >>>> <version>1.4.1</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>com.amazonaws</groupId> >>>> <artifactId>dynamodb-streams-kinesis-adapter</artifactId> >>>> <version>1.4.0</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>com.amazonaws</groupId> >>>> <artifactId>dynamodb-streams-kinesis-adapter</artifactId> >>>> <version>1.4.0</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>com.amazonaws</groupId> >>>> <artifactId>aws-java-sdk</artifactId> >>>> <version>1.11.579</version> >>>> </dependency> >>>> >>>> >>>> <!-- For Parquet --> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-hadoop-compatibility_2.11</artifactId> >>>> <version>1.8.1</version> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-avro</artifactId> >>>> <version>1.8.1</version> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.parquet</groupId> >>>> <artifactId>parquet-avro</artifactId> >>>> <version>1.10.0</version> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.hadoop</groupId> >>>> <artifactId>hadoop-mapreduce-client-core</artifactId> >>>> <version>3.1.1</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-connector-twitter_2.10</artifactId> >>>> <version>1.1.4-hadoop1</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-connector-filesystem_2.11</artifactId> >>>> <version>1.8.1</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>org.json4s</groupId> >>>> <artifactId>json4s-jackson_2.11</artifactId> >>>> <version>3.6.7</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>com.amazonaws</groupId> >>>> <artifactId>aws-java-sdk-cloudsearch</artifactId> >>>> <version>1.11.500</version> >>>> </dependency> >>>> >>>> <!-- >>>> https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2 >>>> --> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-shaded-hadoop2</artifactId> >>>> <version>2.8.3-1.8.3</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-s3-fs-hadoop</artifactId> >>>> <version>1.8.1</version> >>>> </dependency> >>>> >>>> <dependency> >>>> <groupId>org.apache.hadoop</groupId> >>>> <artifactId>hadoop-common</artifactId> >>>> <version>2.8.5</version> >>>> </dependency> >>>> >>>> >>>> </dependencies> >>>> >>>> </project> >>>> >>>> Scala Code:- >>>> >>>> package com.aws.examples.s3 >>>> >>>> >>>> import org.apache.flink.api.common.typeinfo.Types >>>> import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} >>>> import org.apache.flink.table.api.{Table, TableEnvironment} >>>> import org.apache.flink.table.api.java.BatchTableEnvironment >>>> import org.apache.flink.table.sources.CsvTableSource >>>> >>>> object Batch { >>>> >>>> def main(args: Array[String]): Unit = { >>>> >>>> val env: ExecutionEnvironment = >>>> ExecutionEnvironment.getExecutionEnvironment >>>> val tableEnv: BatchTableEnvironment = >>>> TableEnvironment.getTableEnvironment(env) >>>> /* create table from csv */ >>>> >>>> val tableSrc = CsvTableSource >>>> .builder() >>>> .path("s3a://bucket/csvfolder/avg.txt") >>>> .fieldDelimiter(",") >>>> .field("date", Types.STRING) >>>> .field("month", Types.STRING) >>>> .field("category", Types.STRING) >>>> .field("product", Types.STRING) >>>> .field("profit", Types.INT) >>>> .build() >>>> >>>> tableEnv.registerTableSource("CatalogTable", tableSrc) >>>> >>>> val catalog: Table = tableEnv.scan("CatalogTable") >>>> /* querying with Table API */ >>>> >>>> val order20: Table = catalog >>>> .filter(" category === 'Category5'") >>>> .groupBy("month") >>>> .select("month, profit.sum as sum") >>>> .orderBy("sum") >>>> >>>> val order20Set: DataSet[Row1] = tableEnv.toDataSet(order20, >>>> classOf[Row1]) >>>> >>>> order20Set.writeAsText("src/main/resources/table1/table1") >>>> >>>> //tableEnv.toAppendStream(order20, >>>> classOf[Row]).writeAsText("/home/jivesh/table") >>>> env.execute("State") >>>> >>>> } >>>> >>>> class Row1 { >>>> >>>> var month: String = _ >>>> >>>> var sum: java.lang.Integer = _ >>>> >>>> override def toString(): String = month + "," + sum >>>> >>>> } >>>> >>>> } >>>> >>>> Error:- >>>> *Caused by: >>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable >>>> to load credentials from service endpoint* >>>> >>>> *Caused by: >>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException: No >>>> AWS Credentials provided by BasicAWSCredentialsProvider >>>> EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : >>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable >>>> to load credentials from service endpoint* >>>> >>>> >>>> Thanks >>>> >>>> ------------------------------ >>>> >>>> The information contained in this e-mail is confidential and/or >>>> proprietary to Capital One and/or its affiliates and may only be used >>>> solely in performance of work or services for Capital One. The information >>>> transmitted herewith is intended only for use by the individual or entity >>>> to which it is addressed. If the reader of this message is not the intended >>>> recipient, you are hereby notified that any review, retransmission, >>>> dissemination, distribution, copying or other use of, or taking of any >>>> action in reliance upon this information is strictly prohibited. If you >>>> have received this communication in error, please contact the sender and >>>> delete the material from your computer. >>>> >>>> >>>> >>>> >>>> -- >>> Thanks & Regards >>> Sri Tummala >>> >>> >> >> -- >> Thanks & Regards >> Sri Tummala >> >> > > -- > Thanks & Regards > Sri Tummala > > -- Thanks & Regards Sri Tummala