Thanks, Rafi. I will try with this but yes if partitioning is not possible then I also have to look some other solution.
On Wed, Feb 19, 2020 at 3:44 PM Rafi Aroch <rafi.ar...@gmail.com> wrote: > Hi Anuj, > > It's been a while since I wrote this (Flink 1.5.2). Could be a > better/newer way, but this is what how I read & write Parquet with > hadoop-compatibility: > > // imports >> import org.apache.avro.generic.GenericRecord; >> import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat; >> > import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; > > import org.apache.flink.hadoopcompatibility.HadoopInputs; >> import org.apache.hadoop.conf.Configuration; >> import org.apache.hadoop.fs.Path; >> import org.apache.hadoop.mapreduce.Job; >> import org.apache.parquet.avro.AvroParquetInputFormat; >> >> // Creating Parquet input format >> Configuration conf = new Configuration(); >> Job job = Job.getInstance(conf); >> AvroParquetInputFormat<GenericRecord> parquetInputFormat = new >> AvroParquetInputFormat<>(); >> AvroParquetInputFormat.setInputDirRecursive(job, true); >> AvroParquetInputFormat.setInputPaths(job, pathsToProcess); >> HadoopInputFormat<Void, GenericRecord> inputFormat >> = HadoopInputs.createHadoopInput(parquetInputFormat, Void.class, >> GenericRecord.class, job); >> > > >> // Creating Parquet output format >> AvroParquetOutputFormat<GenericRecord> parquetOutputFormat = new >> AvroParquetOutputFormat<>(); >> AvroParquetOutputFormat.setSchema(job, new >> Schema.Parser().parse(SomeEvent.SCHEMA)); >> AvroParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY); >> AvroParquetOutputFormat.setCompressOutput(job, true); >> AvroParquetOutputFormat.setOutputPath(job, new Path(pathString)); >> HadoopOutputFormat<Void, GenericRecord> outputFormat = new >> HadoopOutputFormat<>(parquetOutputFormat, job); > > > > DataSource<Tuple2<Void, GenericRecord>> inputFileSource = >> env.createInput(inputFormat); > > > > // Start processing... > > > > // Writing result as Parquet >> resultDataSet.output(outputFormat); > > > Regarding writing partitioned data, as far as I know, there is no way to > achieve that with the DataSet API with hadoop-compatibility. > > You could implement this with reading from input files as stream and then > using StreamingFileSink with a custom BucketAssigner [1]. > The problem with that (which was not yet resolved AFAIK) is described here > [2] in "Important Notice 2". > > Sadly I say, that eventually, for this use-case I chose Spark to do the > job... > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html#general > > Hope this helps. > > Rafi > > > On Sat, Feb 15, 2020 at 5:03 PM aj <ajainje...@gmail.com> wrote: > >> Hi Rafi, >> >> I have a similar use case where I want to read parquet files in the >> dataset and want to perform some transformation and similarly want to write >> the result using year month day partitioned. >> >> I am stuck at first step only where how to read and write Parquet files >> using hadoop-Compatability. >> >> Please help me with this and also if u find the solution for how to write >> data in partitioned. >> >> Thanks, >> Anuj >> >> >> On Thu, Oct 25, 2018 at 5:35 PM Andrey Zagrebin <and...@data-artisans.com> >> wrote: >> >>> Hi Rafi, >>> >>> At the moment I do not see any support of Parquet in DataSet API >>> except HadoopOutputFormat, mentioned in stack overflow question. I have >>> cc’ed Fabian and Aljoscha, maybe they could provide more information. >>> >>> Best, >>> Andrey >>> >>> On 25 Oct 2018, at 13:08, Rafi Aroch <rafi.ar...@gmail.com> wrote: >>> >>> Hi, >>> >>> I'm writing a Batch job which reads Parquet, does some aggregations and >>> writes back as Parquet files. >>> I would like the output to be partitioned by year, month, day by event >>> time. Similarly to the functionality of the BucketingSink. >>> >>> I was able to achieve the reading/writing to/from Parquet by using the >>> hadoop-compatibility features. >>> I couldn't find a way to partition the data by year, month, day to >>> create a folder hierarchy accordingly. Everything is written to a single >>> directory. >>> >>> I could find an unanswered question about this issue: >>> https://stackoverflow.com/questions/52204034/apache-flink-does-dataset-api-support-writing-output-to-individual-file-partit >>> >>> Can anyone suggest a way to achieve this? Maybe there's a way to >>> integrate the BucketingSink with the DataSet API? Another solution? >>> >>> Rafi >>> >>> >>> >> >> -- >> Thanks & Regards, >> Anuj Jain >> Mob. : +91- 8588817877 >> Skype : anuj.jain07 >> <http://www.oracle.com/> >> >> >> <http://www.cse.iitm.ac.in/%7Eanujjain/> >> > -- Thanks & Regards, Anuj Jain Mob. : +91- 8588817877 Skype : anuj.jain07 <http://www.oracle.com/> <http://www.cse.iitm.ac.in/%7Eanujjain/>