Which I already did in my pin still its not working. Thanks Sri
On Fri, 12 Mar 2021 at 06:18, Chesnay Schepler <ches...@apache.org> wrote: > The concept of plugins does not exist in 1.8.1. As a result it should be > sufficient for your use-case to add a dependency on flink-s3-fs-hadoop to > your project. > > On 3/12/2021 4:33 AM, sri hari kali charan Tummala wrote: > > Let's close this issue guys please answer my questions. I am using Flink > 1.8.1. > > Thanks > Sri > > On Wed, 10 Mar 2021 at 13:25, sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> Also I don't see ConfigConstants.ENV_FLINK_PLUGINS_DIR I only see >> ConfigConstants.ENV_FLINK_LIB_DIR will this work ? >> >> Thanks >> Sri >> >> On Wed, Mar 10, 2021 at 1:23 PM sri hari kali charan Tummala < >> kali.tumm...@gmail.com> wrote: >> >>> I am not getting what you both are talking about lets be clear. >>> >>> Plugin ? what is it ? Is it a Jar which I have to download from the >>> Internet and place it in a folder ? Is this the Jar which I have to >>> download ? (flink-s3-fs-hadoop) ? >>> >>> Will this belo solution work ? >>> >>> https://stackoverflow.com/questions/64115627/flink-1-11-2-cant-find-implementation-for-s3-despite-correct-plugins-being >>> >>> Thanks >>> Sri >>> >>> >>> >>> On Wed, Mar 10, 2021 at 11:34 AM Chesnay Schepler <ches...@apache.org> >>> wrote: >>> >>>> Well, you could do this before running the job: >>>> >>>> // set the ConfigConstants.ENV_FLINK_PLUGINS_DIR environment variable, >>>> pointing to a directory containing the plugins >>>> >>>> PluginManager pluginManager = >>>> PluginUtils.createPluginManagerFromRootFolder(new Configuration()); >>>> Filesystem.initialize(new Configuration(), pluginManager); >>>> >>>> On 3/10/2021 8:16 PM, Lasse Nedergaard wrote: >>>> >>>> Hi. >>>> >>>> I had the same problem. Flink use a plugins to access s3. When you run >>>> local it starts a mini cluster and the mini cluster don’t load plugins. So >>>> it’s not possible without modifying Flink. In my case I wanted to >>>> investigate save points through Flink processor API and the workaround was >>>> to build my own version of the processor API and include the missing part. >>>> >>>> Med venlig hilsen / Best regards >>>> Lasse Nedergaard >>>> >>>> >>>> Den 10. mar. 2021 kl. 17.33 skrev sri hari kali charan Tummala >>>> <kali.tumm...@gmail.com> <kali.tumm...@gmail.com>: >>>> >>>> >>>> Flink, >>>> >>>> I am able to access Kinesis from Intellij but not S3 I have edited my >>>> stack overflow question with kinesis code , Flink is still having issues >>>> reading S3. >>>> >>>> >>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868 >>>> >>>> >>>> Thanks >>>> Sri >>>> >>>> On Tue, Mar 9, 2021 at 11:30 AM sri hari kali charan Tummala < >>>> kali.tumm...@gmail.com> wrote: >>>> >>>>> my stack overflow question. >>>>> >>>>> >>>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868 >>>>> >>>>> On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan Tummala < >>>>> kali.tumm...@gmail.com> wrote: >>>>> >>>>>> Here is my Intellij question. >>>>>> >>>>>> >>>>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868 >>>>>> >>>>>> On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala < >>>>>> kali.tumm...@gmail.com> wrote: >>>>>> >>>>>>> >>>>>>> Hi Flink Experts, >>>>>>>> >>>>>>> >>>>>>> I am trying to read an S3 file from my Intellij using Flink I >>>>>>>> am.comimg across Aws Auth error can someone help below are all the >>>>>>>> details. >>>>>>>> >>>>>>> >>>>>>> >>>>>>>> I have Aws credentials in homefolder/.aws/credentials >>>>>>>> >>>>>>> >>>>>>> My Intellij Environment Variables:- >>>>>>>> ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1 >>>>>>>> >>>>>>>> FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config >>>>>>>> >>>>>>>> flink-conf.yaml file content:- >>>>>>>> >>>>>>>> fs.hdfs.hadoopconf: >>>>>>>> /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config >>>>>>>> >>>>>>>> core-site.xml file content:- >>>>>>>> >>>>>>>> <?xml version="1.0"?><?xml-stylesheet type="text/xsl" >>>>>>>> href="configuration.xsl"?><configuration> <property> >>>>>>>> <name>fs.s3.impl</name> >>>>>>>> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> </property> >>>>>>>> <property> <name>fs.s3.buffer.dir</name> >>>>>>>> <value>/tmp</value> </property> <property> >>>>>>>> <name>fs.s3a.server-side-encryption-algorithm</name> >>>>>>>> <value>AES256</value> </property> <!--<property> >>>>>>>> <name>fs.s3a.aws.credentials.provider</name> >>>>>>>> <value>org.apache.hadoop.fs.s3a.SharedInstanceProfileCredentialsProvider</value> >>>>>>>> </property>--> <property> >>>>>>>> <name>fs.s3a.aws.credentials.provider</name> >>>>>>>> <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value> >>>>>>>> </property> <property> <name>fs.s3a.access.key</name> >>>>>>>> <value></value> </property> <property> >>>>>>>> <name>fs.s3a.secret.key</name> <value></value> </property> >>>>>>>> <property> <name>fs.s3a.session.token</name> >>>>>>>> <value></value> </property> <property> >>>>>>>> <name>fs.s3a.proxy.host</name> <value></value> </property> >>>>>>>> <property> <name>fs.s3a.proxy.port</name> >>>>>>>> <value>8099</value> </property> <property> >>>>>>>> <name>fs.s3a.proxy.username</name> <value></value> >>>>>>>> </property> <property> <name>fs.s3a.proxy.password</name> >>>>>>>> <value></value> </property></configuration> >>>>>>>> >>>>>>>> POM.xml file:- >>>>>>>> >>>>>>>> <?xml version="1.0" encoding="UTF-8"?><project >>>>>>>> xmlns="http://maven.apache.org/POM/4.0.0" >>>>>>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >>>>>>>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >>>>>>>> http://maven.apache.org/xsd/maven-4.0.0.xsd"> >>>>>>>> <modelVersion>4.0.0</modelVersion> >>>>>>>> <groupId>FlinkStreamAndSql</groupId> >>>>>>>> <artifactId>FlinkStreamAndSql</artifactId> >>>>>>>> <version>1.0-SNAPSHOT</version> <build> >>>>>>>> <sourceDirectory>src/main/scala</sourceDirectory> <plugins> >>>>>>>> <plugin> <!-- see >>>>>>>> http://davidb.github.com/scala-maven-plugin --> >>>>>>>> <groupId>net.alchim31.maven</groupId> >>>>>>>> <artifactId>scala-maven-plugin</artifactId> >>>>>>>> <version>3.1.3</version> <executions> >>>>>>>> <execution> <goals> >>>>>>>> <goal>compile</goal> >>>>>>>> <goal>testCompile</goal> </goals> >>>>>>>> <configuration> </configuration> >>>>>>>> </execution> </executions> >>>>>>>> </plugin> <plugin> >>>>>>>> <groupId>org.apache.maven.plugins</groupId> >>>>>>>> <artifactId>maven-surefire-plugin</artifactId> >>>>>>>> <version>2.13</version> <configuration> >>>>>>>> <useFile>false</useFile> >>>>>>>> <disableXmlReport>true</disableXmlReport> <!-- If >>>>>>>> you have classpath issue like NoDefClassError,... --> >>>>>>>> <!-- useManifestOnlyJar>false</useManifestOnlyJar --> >>>>>>>> <includes> <include>**/*Test.*</include> >>>>>>>> <include>**/*Suite.*</include> >>>>>>>> </includes> </configuration> </plugin> >>>>>>>> <!-- "package" command plugin --> <plugin> >>>>>>>> <artifactId>maven-assembly-plugin</artifactId> >>>>>>>> <version>2.4.1</version> <configuration> >>>>>>>> <descriptorRefs> >>>>>>>> <descriptorRef>jar-with-dependencies</descriptorRef> >>>>>>>> </descriptorRefs> </configuration> >>>>>>>> <executions> <execution> >>>>>>>> <id>make-assembly</id> <phase>package</phase> >>>>>>>> <goals> >>>>>>>> <goal>single</goal> </goals> >>>>>>>> </execution> </executions> </plugin> >>>>>>>> </plugins> </build> <dependencies> <dependency> >>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-core</artifactId> >>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-core</artifactId> >>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-clients_2.11</artifactId> >>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>> <groupId>org.apache.derby</groupId> >>>>>>>> <artifactId>derby</artifactId> <version>10.13.1.1</version> >>>>>>>> </dependency> <dependency> >>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-jdbc_2.11</artifactId> >>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-table-api-scala_2.11</artifactId> >>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-table-api-java</artifactId> >>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-table</artifactId> >>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-table-planner_2.11</artifactId> >>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-json</artifactId> >>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-scala_2.11</artifactId> >>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-scala_2.11</artifactId> >>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-streaming-scala_2.11</artifactId> >>>>>>>> <version>1.8.1</version> </dependency> >>>>>>>> <dependency> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-connector-kinesis_2.11</artifactId> >>>>>>>> <version>1.8.0</version> >>>>>>>> <scope>system</scope> >>>>>>>> <systemPath>${project.basedir}/Jars/flink-connector-kinesis_2.11-1.8-SNAPSHOT.jar</systemPath> >>>>>>>> </dependency> <dependency> >>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> >>>>>>>> <version>1.8.1</version> </dependency> >>>>>>>> <dependency> <groupId>com.amazonaws</groupId> >>>>>>>> <artifactId>amazon-kinesis-client</artifactId> >>>>>>>> <version>1.8.8</version> </dependency> >>>>>>>> <dependency> <groupId>com.amazonaws</groupId> >>>>>>>> <artifactId>aws-java-sdk-kinesis</artifactId> >>>>>>>> <version>1.11.579</version> </dependency> >>>>>>>> <dependency> <groupId>commons-dbcp</groupId> >>>>>>>> <artifactId>commons-dbcp</artifactId> >>>>>>>> <version>1.2.2</version> </dependency> >>>>>>>> <dependency> <groupId>com.google.code.gson</groupId> >>>>>>>> <artifactId>gson</artifactId> >>>>>>>> <version>2.1</version> </dependency> >>>>>>>> <dependency> <groupId>commons-cli</groupId> >>>>>>>> <artifactId>commons-cli</artifactId> >>>>>>>> <version>1.4</version> </dependency> <!-- >>>>>>>> https://mvnrepository.com/artifact/org.apache.commons/commons-csv --> >>>>>>>> <dependency> <groupId>org.apache.commons</groupId> >>>>>>>> <artifactId>commons-csv</artifactId> >>>>>>>> <version>1.7</version> </dependency> <dependency> >>>>>>>> <groupId>org.apache.commons</groupId> >>>>>>>> <artifactId>commons-compress</artifactId> >>>>>>>> <version>1.4.1</version> </dependency> <dependency> >>>>>>>> <groupId>com.amazonaws</groupId> >>>>>>>> <artifactId>dynamodb-streams-kinesis-adapter</artifactId> >>>>>>>> <version>1.4.0</version> </dependency> <dependency> >>>>>>>> <groupId>com.amazonaws</groupId> >>>>>>>> <artifactId>dynamodb-streams-kinesis-adapter</artifactId> >>>>>>>> <version>1.4.0</version> </dependency> <dependency> >>>>>>>> <groupId>com.amazonaws</groupId> >>>>>>>> <artifactId>aws-java-sdk</artifactId> >>>>>>>> <version>1.11.579</version> </dependency> <!-- For >>>>>>>> Parquet --> <dependency> >>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-hadoop-compatibility_2.11</artifactId> >>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-avro</artifactId> >>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>> <groupId>org.apache.parquet</groupId> >>>>>>>> <artifactId>parquet-avro</artifactId> >>>>>>>> <version>1.10.0</version> </dependency> <dependency> >>>>>>>> <groupId>org.apache.hadoop</groupId> >>>>>>>> <artifactId>hadoop-mapreduce-client-core</artifactId> >>>>>>>> <version>3.1.1</version> </dependency> <dependency> >>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-connector-twitter_2.10</artifactId> >>>>>>>> <version>1.1.4-hadoop1</version> </dependency> >>>>>>>> <dependency> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-connector-filesystem_2.11</artifactId> >>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>> <groupId>org.json4s</groupId> >>>>>>>> <artifactId>json4s-jackson_2.11</artifactId> >>>>>>>> <version>3.6.7</version> </dependency> <dependency> >>>>>>>> <groupId>com.amazonaws</groupId> >>>>>>>> <artifactId>aws-java-sdk-cloudsearch</artifactId> >>>>>>>> <version>1.11.500</version> </dependency> <!-- >>>>>>>> https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2 >>>>>>>> --> <dependency> >>>>>>>> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-shaded-hadoop2</artifactId> >>>>>>>> <version>2.8.3-1.8.3</version> </dependency> >>>>>>>> <dependency> <groupId>org.apache.flink</groupId> >>>>>>>> <artifactId>flink-s3-fs-hadoop</artifactId> >>>>>>>> <version>1.8.1</version> </dependency> <dependency> >>>>>>>> <groupId>org.apache.hadoop</groupId> >>>>>>>> <artifactId>hadoop-common</artifactId> >>>>>>>> <version>2.8.5</version> </dependency> >>>>>>>> </dependencies></project> >>>>>>>> >>>>>>>> Scala Code:- >>>>>>>> >>>>>>>> package com.aws.examples.s3 >>>>>>>> >>>>>>>> import org.apache.flink.api.common.typeinfo.Typesimport >>>>>>>> org.apache.flink.api.java.{DataSet, ExecutionEnvironment}import >>>>>>>> org.apache.flink.table.api.{Table, TableEnvironment}import >>>>>>>> org.apache.flink.table.api.java.BatchTableEnvironmentimport >>>>>>>> org.apache.flink.table.sources.CsvTableSource >>>>>>>> object Batch { >>>>>>>> >>>>>>>> def main(args: Array[String]): Unit = { >>>>>>>> >>>>>>>> val env: ExecutionEnvironment = >>>>>>>> ExecutionEnvironment.getExecutionEnvironment val tableEnv: >>>>>>>> BatchTableEnvironment = >>>>>>>> TableEnvironment.getTableEnvironment(env) >>>>>>>> /* create table from csv */ val tableSrc = CsvTableSource >>>>>>>> .builder() >>>>>>>> .path("s3a://bucket/csvfolder/avg.txt") >>>>>>>> .fieldDelimiter(",") >>>>>>>> .field("date", Types.STRING) >>>>>>>> .field("month", Types.STRING) >>>>>>>> .field("category", Types.STRING) >>>>>>>> .field("product", Types.STRING) >>>>>>>> .field("profit", Types.INT) >>>>>>>> .build() >>>>>>>> >>>>>>>> tableEnv.registerTableSource("CatalogTable", tableSrc) >>>>>>>> >>>>>>>> val catalog: Table = tableEnv.scan("CatalogTable") >>>>>>>> /* querying with Table API */ val order20: Table = catalog >>>>>>>> .filter(" category === 'Category5'") >>>>>>>> .groupBy("month") >>>>>>>> .select("month, profit.sum as sum") >>>>>>>> .orderBy("sum") >>>>>>>> >>>>>>>> val order20Set: DataSet[Row1] = tableEnv.toDataSet(order20, >>>>>>>> classOf[Row1]) >>>>>>>> >>>>>>>> order20Set.writeAsText("src/main/resources/table1/table1") >>>>>>>> >>>>>>>> //tableEnv.toAppendStream(order20, >>>>>>>> classOf[Row]).writeAsText("/home/jivesh/table") env.execute("State") >>>>>>>> >>>>>>>> } >>>>>>>> >>>>>>>> class Row1 { >>>>>>>> >>>>>>>> var month: String = _ >>>>>>>> >>>>>>>> var sum: java.lang.Integer = _ >>>>>>>> >>>>>>>> override def toString(): String = month + "," + sum } >>>>>>>> >>>>>>>> } >>>>>>>> >>>>>>>> Error:- >>>>>>>> *Caused by: >>>>>>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: >>>>>>>> Unable >>>>>>>> to load credentials from service endpoint* >>>>>>>> >>>>>>>> *Caused by: >>>>>>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException: >>>>>>>> No >>>>>>>> AWS Credentials provided by BasicAWSCredentialsProvider >>>>>>>> EnvironmentVariableCredentialsProvider >>>>>>>> InstanceProfileCredentialsProvider : >>>>>>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: >>>>>>>> Unable >>>>>>>> to load credentials from service endpoint* >>>>>>>> >>>>>>>> >>>>>>>> Thanks >>>>>>>> >>>>>>>> ------------------------------ >>>>>>>> >>>>>>>> The information contained in this e-mail is confidential and/or >>>>>>>> proprietary to Capital One and/or its affiliates and may only be used >>>>>>>> solely in performance of work or services for Capital One. The >>>>>>>> information >>>>>>>> transmitted herewith is intended only for use by the individual or >>>>>>>> entity >>>>>>>> to which it is addressed. If the reader of this message is not the >>>>>>>> intended >>>>>>>> recipient, you are hereby notified that any review, retransmission, >>>>>>>> dissemination, distribution, copying or other use of, or taking of any >>>>>>>> action in reliance upon this information is strictly prohibited. If you >>>>>>>> have received this communication in error, please contact the sender >>>>>>>> and >>>>>>>> delete the material from your computer. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>> Thanks & Regards >>>>>>> Sri Tummala >>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Thanks & Regards >>>>>> Sri Tummala >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Thanks & Regards >>>>> Sri Tummala >>>>> >>>>> >>>> >>>> -- >>>> Thanks & Regards >>>> Sri Tummala >>>> >>>> >>>> >>> >>> -- >>> Thanks & Regards >>> Sri Tummala >>> >>> >> >> -- >> Thanks & Regards >> Sri Tummala >> >> -- > Thanks & Regards > Sri Tummala > > > -- Thanks & Regards Sri Tummala