Hi ,
I am pretty new. I am keep on struggling to read a file from s3 but getting
this weird exception :
Caused by: java.lang.NumberFormatException: For input string: "64M" (if
anyone can link me to a working github example that will be awesome) . what
am i doing wrong?

This is how my code looks like this :

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.column.page.PageReadStore
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.io.ColumnIOFactory

class ParquetSourceFunction extends SourceFunction[String]{
  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    val inputPath = "s3a://foo/year=2000/month=02/"
    val conf = new Configuration()
    conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
    val hadoopFile = HadoopInputFile.fromPath(new Path(inputPath), conf)
    val readFooter = ParquetFileReader.open(hadoopFile)
    val metadata = readFooter.getFileMetaData
    val schema = metadata.getSchema
    val parquetFileReader = new ParquetFileReader(conf, metadata, new
Path(inputPath), readFooter.getRowGroups, schema.getColumns)
    var pages: PageReadStore = null

    try {
      while ({ pages = parquetFileReader.readNextRowGroup; pages != null }) {
        val rows = pages.getRowCount
        val columnIO = new ColumnIOFactory().getColumnIO(schema)
        val recordReader = columnIO.getRecordReader(pages, new
GroupRecordConverter(schema))
        (0L until rows).foreach { _ =>
          val group = recordReader.read()
          val myString = group.getString("field_name", 0)
          ctx.collect(myString)
        }
      }
    }
  }

  override def cancel(): Unit = ???
}

object Job {
  def main(args: Array[String]): Unit = {
    // set up the execution environment
    lazy val env = StreamExecutionEnvironment.getExecutionEnvironment


    lazy val stream = env.addSource(new ParquetSourceFunction)
    stream.print()
    env.execute()
  }
}

sbt dependencies :


ThisBuild / scalaVersion := "2.12.1"

val flinkVersion = "1.12.1"
val awsSdkVersion = "1.7.4"
val hadoopVersion = "2.7.3"

val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-clients" % flinkVersion,// % "provided",
  "org.apache.flink" %% "flink-scala" % flinkVersion,// % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion, // % "provided")
  "org.apache.flink" %% "flink-parquet" % flinkVersion,
  "org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion)

val s3Dependencies = Seq(
  ("com.amazonaws" % "aws-java-sdk" % awsSdkVersion),
  ("org.apache.hadoop" % "hadoop-aws" % hadoopVersion)
)

val serializationDependencies = Seq(
  ("org.apache.avro" % "avro" % "1.7.7"),
  ("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"),
  ("org.apache.parquet" % "parquet-avro" % "1.8.1"))

lazy val root = (project in file(".")).
  settings(
    libraryDependencies ++= flinkDependencies,
    libraryDependencies ++= s3Dependencies,
    libraryDependencies ++= serializationDependencies,
    libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "3.3.0" ,
    libraryDependencies += "org.apache.parquet" % "parquet-hadoop" % "1.11.1",
    libraryDependencies += "org.apache.flink" %%
"flink-table-planner-blink" % "1.12.1" //% "provided"
  )

Reply via email to