Hi ,
I am pretty new. I am keep on struggling to read a file from s3 but getting
this weird exception :
Caused by: java.lang.NumberFormatException: For input string: "64M" (if
anyone can link me to a working github example that will be awesome) . what
am i doing wrong?
This is how my code looks like this :
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.column.page.PageReadStore
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.io.ColumnIOFactory
class ParquetSourceFunction extends SourceFunction[String]{
override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
val inputPath = "s3a://foo/year=2000/month=02/"
val conf = new Configuration()
conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
val hadoopFile = HadoopInputFile.fromPath(new Path(inputPath), conf)
val readFooter = ParquetFileReader.open(hadoopFile)
val metadata = readFooter.getFileMetaData
val schema = metadata.getSchema
val parquetFileReader = new ParquetFileReader(conf, metadata, new
Path(inputPath), readFooter.getRowGroups, schema.getColumns)
var pages: PageReadStore = null
try {
while ({ pages = parquetFileReader.readNextRowGroup; pages != null }) {
val rows = pages.getRowCount
val columnIO = new ColumnIOFactory().getColumnIO(schema)
val recordReader = columnIO.getRecordReader(pages, new
GroupRecordConverter(schema))
(0L until rows).foreach { _ =>
val group = recordReader.read()
val myString = group.getString("field_name", 0)
ctx.collect(myString)
}
}
}
}
override def cancel(): Unit = ???
}
object Job {
def main(args: Array[String]): Unit = {
// set up the execution environment
lazy val env = StreamExecutionEnvironment.getExecutionEnvironment
lazy val stream = env.addSource(new ParquetSourceFunction)
stream.print()
env.execute()
}
}
sbt dependencies :
ThisBuild / scalaVersion := "2.12.1"
val flinkVersion = "1.12.1"
val awsSdkVersion = "1.7.4"
val hadoopVersion = "2.7.3"
val flinkDependencies = Seq(
"org.apache.flink" %% "flink-clients" % flinkVersion,// % "provided",
"org.apache.flink" %% "flink-scala" % flinkVersion,// % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion, // % "provided")
"org.apache.flink" %% "flink-parquet" % flinkVersion,
"org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion)
val s3Dependencies = Seq(
("com.amazonaws" % "aws-java-sdk" % awsSdkVersion),
("org.apache.hadoop" % "hadoop-aws" % hadoopVersion)
)
val serializationDependencies = Seq(
("org.apache.avro" % "avro" % "1.7.7"),
("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"),
("org.apache.parquet" % "parquet-avro" % "1.8.1"))
lazy val root = (project in file(".")).
settings(
libraryDependencies ++= flinkDependencies,
libraryDependencies ++= s3Dependencies,
libraryDependencies ++= serializationDependencies,
libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "3.3.0" ,
libraryDependencies += "org.apache.parquet" % "parquet-hadoop" % "1.11.1",
libraryDependencies += "org.apache.flink" %%
"flink-table-planner-blink" % "1.12.1" //% "provided"
)