Hi Anuj,

another option would be to use the new Hive connectors. Have you looked into those? They might work on SQL internal data types which is why you would need to use the Table API then.

Maybe Bowen in CC can help you here.

Regards,
Timo

On 19.02.20 11:14, Rafi Aroch wrote:
Hi Anuj,

It's been a while since I wrote this (Flink 1.5.2). Could be a better/newer way, but this is what how I read & write Parquet with hadoop-compatibility:

    // imports
    import org.apache.avro.generic.GenericRecord;
    import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;

import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
    import org.apache.flink.hadoopcompatibility.HadoopInputs;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.parquet.avro.AvroParquetInputFormat;

    // Creating Parquet input format
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
    AvroParquetInputFormat<GenericRecord> parquetInputFormat = new
    AvroParquetInputFormat<>();
    AvroParquetInputFormat.setInputDirRecursive(job, true);
    AvroParquetInputFormat.setInputPaths(job, pathsToProcess);
    HadoopInputFormat<Void, GenericRecord> inputFormat
    = HadoopInputs.createHadoopInput(parquetInputFormat, Void.class,
    GenericRecord.class, job);

    // Creating Parquet output format
    AvroParquetOutputFormat<GenericRecord> parquetOutputFormat = new
    AvroParquetOutputFormat<>();
    AvroParquetOutputFormat.setSchema(job, new
    Schema.Parser().parse(SomeEvent.SCHEMA));
    AvroParquetOutputFormat.setCompression(job,
    CompressionCodecName.SNAPPY);
    AvroParquetOutputFormat.setCompressOutput(job, true);
    AvroParquetOutputFormat.setOutputPath(job, new Path(pathString));
    HadoopOutputFormat<Void, GenericRecord> outputFormat = new
HadoopOutputFormat<>(parquetOutputFormat, job);
    DataSource<Tuple2<Void, GenericRecord>> inputFileSource =
env.createInput(inputFormat); // Start processing...
    // Writing result as Parquet
    resultDataSet.output(outputFormat);


Regarding writing partitioned data, as far as I know, there is no way to achieve that with the DataSet API with hadoop-compatibility.

You could implement this with reading from input files as stream and then using StreamingFileSink with a custom BucketAssigner [1]. The problem with that (which was not yet resolved AFAIK) is described here [2] in "Important Notice 2".

Sadly I say, that eventually, for this use-case I chose Spark to do the job...

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html
[2]https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html#general

Hope this helps.

Rafi


On Sat, Feb 15, 2020 at 5:03 PM aj <ajainje...@gmail.com <mailto:ajainje...@gmail.com>> wrote:

    Hi Rafi,

    I have a similar use case where I want to read parquet files in the
    dataset and want to perform some transformation and similarly want
    to write the result using year month day partitioned.

    I am stuck at first step only where how to read and write
    Parquet files using hadoop-Compatability.

    Please help me with this and also if u find the solution for how to
    write data in partitioned.

    Thanks,
    Anuj

    On Thu, Oct 25, 2018 at 5:35 PM Andrey Zagrebin
    <and...@data-artisans.com <mailto:and...@data-artisans.com>> wrote:

        Hi Rafi,

        At the moment I do not see any support of Parquet in DataSet API
        except HadoopOutputFormat, mentioned in stack overflow question.
        I have cc’ed Fabian and Aljoscha, maybe they could provide more
        information.

        Best,
        Andrey

        On 25 Oct 2018, at 13:08, Rafi Aroch <rafi.ar...@gmail.com
        <mailto:rafi.ar...@gmail.com>> wrote:

        Hi,

        I'm writing a Batch job which reads Parquet, does some
        aggregations and writes back as Parquet files.
        I would like the output to be partitioned by year, month, day
        by event time. Similarly to the functionality of the
        BucketingSink.

        I was able to achieve the reading/writing to/from Parquet by
        using the hadoop-compatibility features.
        I couldn't find a way to partition the data by year, month,
        day to create a folder hierarchy accordingly. Everything is
        written to a single directory.

        I could find an unanswered question about this issue:
        
https://stackoverflow.com/questions/52204034/apache-flink-does-dataset-api-support-writing-output-to-individual-file-partit

        Can anyone suggest a way to achieve this? Maybe there's a way
        to integrate the BucketingSink with the DataSet API? Another
        solution?

        Rafi



-- Thanks & Regards,
    Anuj Jain
    Mob. : +91- 8588817877
    Skype : anuj.jain07
    ****<http://www.oracle.com/>


    <http://www.cse.iitm.ac.in/%7Eanujjain/>


Reply via email to