Flink,

I am able to access Kinesis from Intellij but not S3 I have edited my stack
overflow question with kinesis code , Flink is still having issues reading
S3.

https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868


Thanks
Sri

On Tue, Mar 9, 2021 at 11:30 AM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> my stack overflow question.
>
>
> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
>
> On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> Here is my Intellij question.
>>
>>
>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
>>
>> On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala <
>> kali.tumm...@gmail.com> wrote:
>>
>>>
>>> Hi Flink Experts,
>>>>
>>>
>>> I am trying to read an S3 file from my Intellij using Flink I am.comimg
>>>> across Aws Auth error can someone help below are all the details.
>>>>
>>>
>>>
>>>> I have Aws credentials in homefolder/.aws/credentials
>>>>
>>>
>>> My Intellij Environment Variables:-
>>>> ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1
>>>>
>>>> FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config
>>>>
>>>> flink-conf.yaml file content:-
>>>>
>>>> fs.hdfs.hadoopconf: 
>>>> /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config
>>>>
>>>> core-site.xml file content:-
>>>>
>>>> <?xml version="1.0"?>
>>>> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
>>>>
>>>> <configuration>
>>>>     <property>
>>>>         <name>fs.s3.impl</name>
>>>>         <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
>>>>     </property>
>>>>
>>>>     <property>
>>>>         <name>fs.s3.buffer.dir</name>
>>>>         <value>/tmp</value>
>>>>     </property>
>>>>
>>>>     <property>
>>>>         <name>fs.s3a.server-side-encryption-algorithm</name>
>>>>         <value>AES256</value>
>>>>     </property>
>>>>
>>>>     <!--<property>
>>>>         <name>fs.s3a.aws.credentials.provider</name>
>>>>         
>>>> <value>org.apache.hadoop.fs.s3a.SharedInstanceProfileCredentialsProvider</value>
>>>>     </property>-->
>>>>
>>>>     <property>
>>>>         <name>fs.s3a.aws.credentials.provider</name>
>>>>         
>>>> <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
>>>>     </property>
>>>>     <property>
>>>>         <name>fs.s3a.access.key</name>
>>>>         <value></value>
>>>>     </property>
>>>>     <property>
>>>>         <name>fs.s3a.secret.key</name>
>>>>         <value></value>
>>>>     </property>
>>>>     <property>
>>>>         <name>fs.s3a.session.token</name>
>>>>         <value></value>
>>>>     </property>
>>>>
>>>>     <property>
>>>>         <name>fs.s3a.proxy.host</name>
>>>>         <value></value>
>>>>     </property>
>>>>     <property>
>>>>         <name>fs.s3a.proxy.port</name>
>>>>         <value>8099</value>
>>>>     </property>
>>>>     <property>
>>>>         <name>fs.s3a.proxy.username</name>
>>>>         <value></value>
>>>>     </property>
>>>>     <property>
>>>>         <name>fs.s3a.proxy.password</name>
>>>>         <value></value>
>>>>     </property>
>>>>
>>>> </configuration>
>>>>
>>>> POM.xml file:-
>>>>
>>>> <?xml version="1.0" encoding="UTF-8"?>
>>>> <project xmlns="http://maven.apache.org/POM/4.0.0";
>>>>          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>>>          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
>>>> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>>>>     <modelVersion>4.0.0</modelVersion>
>>>>
>>>>     <groupId>FlinkStreamAndSql</groupId>
>>>>     <artifactId>FlinkStreamAndSql</artifactId>
>>>>     <version>1.0-SNAPSHOT</version>
>>>>     <build>
>>>>         <sourceDirectory>src/main/scala</sourceDirectory>
>>>>         <plugins>
>>>>             <plugin>
>>>>                 <!-- see http://davidb.github.com/scala-maven-plugin -->
>>>>                 <groupId>net.alchim31.maven</groupId>
>>>>                 <artifactId>scala-maven-plugin</artifactId>
>>>>                 <version>3.1.3</version>
>>>>                 <executions>
>>>>                     <execution>
>>>>                         <goals>
>>>>                             <goal>compile</goal>
>>>>                             <goal>testCompile</goal>
>>>>                         </goals>
>>>>                         <configuration>
>>>>                         </configuration>
>>>>                     </execution>
>>>>                 </executions>
>>>>             </plugin>
>>>>             <plugin>
>>>>                 <groupId>org.apache.maven.plugins</groupId>
>>>>                 <artifactId>maven-surefire-plugin</artifactId>
>>>>                 <version>2.13</version>
>>>>                 <configuration>
>>>>                     <useFile>false</useFile>
>>>>                     <disableXmlReport>true</disableXmlReport>
>>>>                     <!-- If you have classpath issue like 
>>>> NoDefClassError,... -->
>>>>                     <!-- useManifestOnlyJar>false</useManifestOnlyJar -->
>>>>                     <includes>
>>>>                         <include>**/*Test.*</include>
>>>>                         <include>**/*Suite.*</include>
>>>>                     </includes>
>>>>                 </configuration>
>>>>             </plugin>
>>>>
>>>>             <!-- "package" command plugin -->
>>>>             <plugin>
>>>>                 <artifactId>maven-assembly-plugin</artifactId>
>>>>                 <version>2.4.1</version>
>>>>                 <configuration>
>>>>                     <descriptorRefs>
>>>>                         
>>>> <descriptorRef>jar-with-dependencies</descriptorRef>
>>>>                     </descriptorRefs>
>>>>                 </configuration>
>>>>                 <executions>
>>>>                     <execution>
>>>>                         <id>make-assembly</id>
>>>>                         <phase>package</phase>
>>>>                         <goals>
>>>>                             <goal>single</goal>
>>>>                         </goals>
>>>>                     </execution>
>>>>                 </executions>
>>>>             </plugin>
>>>>         </plugins>
>>>>     </build>
>>>>     <dependencies>
>>>>
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-core</artifactId>
>>>>             <version>1.8.1</version>
>>>>         </dependency>
>>>>
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-core</artifactId>
>>>>             <version>1.8.1</version>
>>>>         </dependency>
>>>>
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-clients_2.11</artifactId>
>>>>             <version>1.8.1</version>
>>>>         </dependency>
>>>>
>>>>         <dependency>
>>>>             <groupId>org.apache.derby</groupId>
>>>>             <artifactId>derby</artifactId>
>>>>             <version>10.13.1.1</version>
>>>>         </dependency>
>>>>
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-jdbc_2.11</artifactId>
>>>>             <version>1.8.1</version>
>>>>         </dependency>
>>>>
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-table-api-scala_2.11</artifactId>
>>>>             <version>1.8.1</version>
>>>>         </dependency>
>>>>
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-table-api-java</artifactId>
>>>>             <version>1.8.1</version>
>>>>         </dependency>
>>>>
>>>>
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-table</artifactId>
>>>>             <version>1.8.1</version>
>>>>         </dependency>
>>>>
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-table-planner_2.11</artifactId>
>>>>             <version>1.8.1</version>
>>>>         </dependency>
>>>>
>>>>
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-json</artifactId>
>>>>             <version>1.8.1</version>
>>>>         </dependency>
>>>>
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-scala_2.11</artifactId>
>>>>             <version>1.8.1</version>
>>>>         </dependency>
>>>>
>>>>        <dependency>
>>>>            <groupId>org.apache.flink</groupId>
>>>>            <artifactId>flink-scala_2.11</artifactId>
>>>>            <version>1.8.1</version>
>>>>        </dependency>
>>>>
>>>>        <dependency>
>>>>            <groupId>org.apache.flink</groupId>
>>>>            <artifactId>flink-streaming-scala_2.11</artifactId>
>>>>            <version>1.8.1</version>
>>>>        </dependency>
>>>>
>>>>                <dependency>
>>>>                    <groupId>org.apache.flink</groupId>
>>>>                    <artifactId>flink-connector-kinesis_2.11</artifactId>
>>>>                    <version>1.8.0</version>
>>>>                    <scope>system</scope>
>>>>                    
>>>> <systemPath>${project.basedir}/Jars/flink-connector-kinesis_2.11-1.8-SNAPSHOT.jar</systemPath>
>>>>                </dependency>
>>>>
>>>>                <dependency>
>>>>                    <groupId>org.apache.flink</groupId>
>>>>                    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
>>>>                    <version>1.8.1</version>
>>>>                </dependency>
>>>>
>>>>                <dependency>
>>>>                    <groupId>com.amazonaws</groupId>
>>>>                    <artifactId>amazon-kinesis-client</artifactId>
>>>>                    <version>1.8.8</version>
>>>>                </dependency>
>>>>
>>>>                <dependency>
>>>>                    <groupId>com.amazonaws</groupId>
>>>>                    <artifactId>aws-java-sdk-kinesis</artifactId>
>>>>                    <version>1.11.579</version>
>>>>                </dependency>
>>>>
>>>>                <dependency>
>>>>                    <groupId>commons-dbcp</groupId>
>>>>                    <artifactId>commons-dbcp</artifactId>
>>>>                    <version>1.2.2</version>
>>>>                </dependency>
>>>>                <dependency>
>>>>                    <groupId>com.google.code.gson</groupId>
>>>>                    <artifactId>gson</artifactId>
>>>>                    <version>2.1</version>
>>>>                </dependency>
>>>>
>>>>                <dependency>
>>>>                    <groupId>commons-cli</groupId>
>>>>                    <artifactId>commons-cli</artifactId>
>>>>                    <version>1.4</version>
>>>>                </dependency>
>>>>
>>>>                <!-- 
>>>> https://mvnrepository.com/artifact/org.apache.commons/commons-csv -->
>>>>         <dependency>
>>>>             <groupId>org.apache.commons</groupId>
>>>>             <artifactId>commons-csv</artifactId>
>>>>             <version>1.7</version>
>>>>         </dependency>
>>>>
>>>>         <dependency>
>>>>             <groupId>org.apache.commons</groupId>
>>>>             <artifactId>commons-compress</artifactId>
>>>>             <version>1.4.1</version>
>>>>         </dependency>
>>>>
>>>>         <dependency>
>>>>             <groupId>com.amazonaws</groupId>
>>>>             <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
>>>>             <version>1.4.0</version>
>>>>         </dependency>
>>>>
>>>>         <dependency>
>>>>             <groupId>com.amazonaws</groupId>
>>>>             <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
>>>>             <version>1.4.0</version>
>>>>         </dependency>
>>>>
>>>>         <dependency>
>>>>             <groupId>com.amazonaws</groupId>
>>>>             <artifactId>aws-java-sdk</artifactId>
>>>>             <version>1.11.579</version>
>>>>         </dependency>
>>>>
>>>>
>>>>         <!-- For Parquet -->
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-hadoop-compatibility_2.11</artifactId>
>>>>             <version>1.8.1</version>
>>>>         </dependency>
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-avro</artifactId>
>>>>             <version>1.8.1</version>
>>>>         </dependency>
>>>>         <dependency>
>>>>             <groupId>org.apache.parquet</groupId>
>>>>             <artifactId>parquet-avro</artifactId>
>>>>             <version>1.10.0</version>
>>>>         </dependency>
>>>>         <dependency>
>>>>             <groupId>org.apache.hadoop</groupId>
>>>>             <artifactId>hadoop-mapreduce-client-core</artifactId>
>>>>             <version>3.1.1</version>
>>>>         </dependency>
>>>>
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-connector-twitter_2.10</artifactId>
>>>>             <version>1.1.4-hadoop1</version>
>>>>         </dependency>
>>>>
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-connector-filesystem_2.11</artifactId>
>>>>             <version>1.8.1</version>
>>>>         </dependency>
>>>>
>>>>         <dependency>
>>>>             <groupId>org.json4s</groupId>
>>>>             <artifactId>json4s-jackson_2.11</artifactId>
>>>>             <version>3.6.7</version>
>>>>         </dependency>
>>>>
>>>>         <dependency>
>>>>             <groupId>com.amazonaws</groupId>
>>>>             <artifactId>aws-java-sdk-cloudsearch</artifactId>
>>>>             <version>1.11.500</version>
>>>>         </dependency>
>>>>
>>>>         <!-- 
>>>> https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2 
>>>> -->
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-shaded-hadoop2</artifactId>
>>>>             <version>2.8.3-1.8.3</version>
>>>>         </dependency>
>>>>
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-s3-fs-hadoop</artifactId>
>>>>             <version>1.8.1</version>
>>>>         </dependency>
>>>>
>>>>         <dependency>
>>>>             <groupId>org.apache.hadoop</groupId>
>>>>             <artifactId>hadoop-common</artifactId>
>>>>             <version>2.8.5</version>
>>>>         </dependency>
>>>>
>>>>
>>>>     </dependencies>
>>>>
>>>> </project>
>>>>
>>>> Scala Code:-
>>>>
>>>> package com.aws.examples.s3
>>>>
>>>>
>>>> import org.apache.flink.api.common.typeinfo.Types
>>>> import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
>>>> import org.apache.flink.table.api.{Table, TableEnvironment}
>>>> import org.apache.flink.table.api.java.BatchTableEnvironment
>>>> import org.apache.flink.table.sources.CsvTableSource
>>>>
>>>> object Batch {
>>>>
>>>>   def main(args: Array[String]): Unit = {
>>>>
>>>>     val env: ExecutionEnvironment =
>>>>       ExecutionEnvironment.getExecutionEnvironment
>>>>     val tableEnv: BatchTableEnvironment =
>>>>       TableEnvironment.getTableEnvironment(env)
>>>>     /* create table from csv */
>>>>
>>>>     val tableSrc = CsvTableSource
>>>>       .builder()
>>>>       .path("s3a://bucket/csvfolder/avg.txt")
>>>>       .fieldDelimiter(",")
>>>>       .field("date", Types.STRING)
>>>>       .field("month", Types.STRING)
>>>>       .field("category", Types.STRING)
>>>>       .field("product", Types.STRING)
>>>>       .field("profit", Types.INT)
>>>>       .build()
>>>>
>>>>     tableEnv.registerTableSource("CatalogTable", tableSrc)
>>>>
>>>>     val catalog: Table = tableEnv.scan("CatalogTable")
>>>>     /* querying with Table API */
>>>>
>>>>     val order20: Table = catalog
>>>>       .filter(" category === 'Category5'")
>>>>       .groupBy("month")
>>>>       .select("month, profit.sum as sum")
>>>>       .orderBy("sum")
>>>>
>>>>     val order20Set: DataSet[Row1] = tableEnv.toDataSet(order20, 
>>>> classOf[Row1])
>>>>
>>>>     order20Set.writeAsText("src/main/resources/table1/table1")
>>>>
>>>>     //tableEnv.toAppendStream(order20, 
>>>> classOf[Row]).writeAsText("/home/jivesh/table")
>>>>     env.execute("State")
>>>>
>>>>   }
>>>>
>>>>   class Row1 {
>>>>
>>>>     var month: String = _
>>>>
>>>>     var sum: java.lang.Integer = _
>>>>
>>>>     override def toString(): String = month + "," + sum
>>>>
>>>>   }
>>>>
>>>> }
>>>>
>>>> Error:-
>>>> *Caused by:
>>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable
>>>> to load credentials from service endpoint*
>>>>
>>>> *Caused by:
>>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException: No
>>>> AWS Credentials provided by BasicAWSCredentialsProvider
>>>> EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider :
>>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable
>>>> to load credentials from service endpoint*
>>>>
>>>>
>>>> Thanks
>>>>
>>>> ------------------------------
>>>>
>>>> The information contained in this e-mail is confidential and/or
>>>> proprietary to Capital One and/or its affiliates and may only be used
>>>> solely in performance of work or services for Capital One. The information
>>>> transmitted herewith is intended only for use by the individual or entity
>>>> to which it is addressed. If the reader of this message is not the intended
>>>> recipient, you are hereby notified that any review, retransmission,
>>>> dissemination, distribution, copying or other use of, or taking of any
>>>> action in reliance upon this information is strictly prohibited. If you
>>>> have received this communication in error, please contact the sender and
>>>> delete the material from your computer.
>>>>
>>>>
>>>>
>>>>
>>>> --
>>> Thanks & Regards
>>> Sri Tummala
>>>
>>>
>>
>> --
>> Thanks & Regards
>> Sri Tummala
>>
>>
>
> --
> Thanks & Regards
> Sri Tummala
>
>

-- 
Thanks & Regards
Sri Tummala

Reply via email to