Hi , I am pretty new. I am keep on struggling to read a file from s3 but getting this weird exception : Caused by: java.lang.NumberFormatException: For input string: "64M" (if anyone can link me to a working github example that will be awesome) . what am i doing wrong?
This is how my code looks like this : import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.parquet.column.page.PageReadStore import org.apache.parquet.example.data.simple.convert.GroupRecordConverter import org.apache.parquet.hadoop.ParquetFileReader import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.parquet.io.ColumnIOFactory class ParquetSourceFunction extends SourceFunction[String]{ override def run(ctx: SourceFunction.SourceContext[String]): Unit = { val inputPath = "s3a://foo/year=2000/month=02/" val conf = new Configuration() conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") val hadoopFile = HadoopInputFile.fromPath(new Path(inputPath), conf) val readFooter = ParquetFileReader.open(hadoopFile) val metadata = readFooter.getFileMetaData val schema = metadata.getSchema val parquetFileReader = new ParquetFileReader(conf, metadata, new Path(inputPath), readFooter.getRowGroups, schema.getColumns) var pages: PageReadStore = null try { while ({ pages = parquetFileReader.readNextRowGroup; pages != null }) { val rows = pages.getRowCount val columnIO = new ColumnIOFactory().getColumnIO(schema) val recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema)) (0L until rows).foreach { _ => val group = recordReader.read() val myString = group.getString("field_name", 0) ctx.collect(myString) } } } } override def cancel(): Unit = ??? } object Job { def main(args: Array[String]): Unit = { // set up the execution environment lazy val env = StreamExecutionEnvironment.getExecutionEnvironment lazy val stream = env.addSource(new ParquetSourceFunction) stream.print() env.execute() } } sbt dependencies : ThisBuild / scalaVersion := "2.12.1" val flinkVersion = "1.12.1" val awsSdkVersion = "1.7.4" val hadoopVersion = "2.7.3" val flinkDependencies = Seq( "org.apache.flink" %% "flink-clients" % flinkVersion,// % "provided", "org.apache.flink" %% "flink-scala" % flinkVersion,// % "provided", "org.apache.flink" %% "flink-streaming-scala" % flinkVersion, // % "provided") "org.apache.flink" %% "flink-parquet" % flinkVersion, "org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion) val s3Dependencies = Seq( ("com.amazonaws" % "aws-java-sdk" % awsSdkVersion), ("org.apache.hadoop" % "hadoop-aws" % hadoopVersion) ) val serializationDependencies = Seq( ("org.apache.avro" % "avro" % "1.7.7"), ("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"), ("org.apache.parquet" % "parquet-avro" % "1.8.1")) lazy val root = (project in file(".")). settings( libraryDependencies ++= flinkDependencies, libraryDependencies ++= s3Dependencies, libraryDependencies ++= serializationDependencies, libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "3.3.0" , libraryDependencies += "org.apache.parquet" % "parquet-hadoop" % "1.11.1", libraryDependencies += "org.apache.flink" %% "flink-table-planner-blink" % "1.12.1" //% "provided" )