Thanks, Rafi. I will try with this but yes if partitioning is not possible
then I also have to look some other solution.

On Wed, Feb 19, 2020 at 3:44 PM Rafi Aroch <rafi.ar...@gmail.com> wrote:

> Hi Anuj,
>
> It's been a while since I wrote this (Flink 1.5.2). Could be a
> better/newer way, but this is what how I read & write Parquet with
> hadoop-compatibility:
>
> // imports
>> import org.apache.avro.generic.GenericRecord;
>> import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
>>
> import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
>
> import org.apache.flink.hadoopcompatibility.HadoopInputs;
>> import org.apache.hadoop.conf.Configuration;
>> import org.apache.hadoop.fs.Path;
>> import org.apache.hadoop.mapreduce.Job;
>> import org.apache.parquet.avro.AvroParquetInputFormat;
>>
>> // Creating Parquet input format
>> Configuration conf = new Configuration();
>> Job job = Job.getInstance(conf);
>> AvroParquetInputFormat<GenericRecord> parquetInputFormat = new
>> AvroParquetInputFormat<>();
>> AvroParquetInputFormat.setInputDirRecursive(job, true);
>> AvroParquetInputFormat.setInputPaths(job, pathsToProcess);
>> HadoopInputFormat<Void, GenericRecord> inputFormat
>> = HadoopInputs.createHadoopInput(parquetInputFormat, Void.class,
>> GenericRecord.class, job);
>>
>
>
>> // Creating Parquet output format
>> AvroParquetOutputFormat<GenericRecord> parquetOutputFormat = new
>> AvroParquetOutputFormat<>();
>> AvroParquetOutputFormat.setSchema(job, new
>> Schema.Parser().parse(SomeEvent.SCHEMA));
>> AvroParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
>> AvroParquetOutputFormat.setCompressOutput(job, true);
>> AvroParquetOutputFormat.setOutputPath(job, new Path(pathString));
>> HadoopOutputFormat<Void, GenericRecord> outputFormat = new
>> HadoopOutputFormat<>(parquetOutputFormat, job);
>
>
>
> DataSource<Tuple2<Void, GenericRecord>> inputFileSource =
>> env.createInput(inputFormat);
>
>
>
> // Start processing...
>
>
>
> // Writing result as Parquet
>> resultDataSet.output(outputFormat);
>
>
> Regarding writing partitioned data, as far as I know, there is no way to
> achieve that with the DataSet API with hadoop-compatibility.
>
> You could implement this with reading from input files as stream and then
> using StreamingFileSink with a custom BucketAssigner [1].
> The problem with that (which was not yet resolved AFAIK) is described here
> [2] in "Important Notice 2".
>
> Sadly I say, that eventually, for this use-case I chose Spark to do the
> job...
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html#general
>
> Hope this helps.
>
> Rafi
>
>
> On Sat, Feb 15, 2020 at 5:03 PM aj <ajainje...@gmail.com> wrote:
>
>> Hi Rafi,
>>
>> I have a similar use case where I want to read parquet files in the
>> dataset and want to perform some transformation and similarly want to write
>> the result using year month day partitioned.
>>
>> I am stuck at first step only where how to read and write Parquet files
>> using hadoop-Compatability.
>>
>> Please help me with this and also if u find the solution for how to write
>> data in partitioned.
>>
>> Thanks,
>> Anuj
>>
>>
>> On Thu, Oct 25, 2018 at 5:35 PM Andrey Zagrebin <and...@data-artisans.com>
>> wrote:
>>
>>> Hi Rafi,
>>>
>>> At the moment I do not see any support of Parquet in DataSet API
>>> except HadoopOutputFormat, mentioned in stack overflow question. I have
>>> cc’ed Fabian and Aljoscha, maybe they could provide more information.
>>>
>>> Best,
>>> Andrey
>>>
>>> On 25 Oct 2018, at 13:08, Rafi Aroch <rafi.ar...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> I'm writing a Batch job which reads Parquet, does some aggregations and
>>> writes back as Parquet files.
>>> I would like the output to be partitioned by year, month, day by event
>>> time. Similarly to the functionality of the BucketingSink.
>>>
>>> I was able to achieve the reading/writing to/from Parquet by using the
>>> hadoop-compatibility features.
>>> I couldn't find a way to partition the data by year, month, day to
>>> create a folder hierarchy accordingly. Everything is written to a single
>>> directory.
>>>
>>> I could find an unanswered question about this issue:
>>> https://stackoverflow.com/questions/52204034/apache-flink-does-dataset-api-support-writing-output-to-individual-file-partit
>>>
>>> Can anyone suggest a way to achieve this? Maybe there's a way to
>>> integrate the BucketingSink with the DataSet API? Another solution?
>>>
>>> Rafi
>>>
>>>
>>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>> Mob. : +91- 8588817877
>> Skype : anuj.jain07
>> <http://www.oracle.com/>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to