Hi Arvid, Thanks for the details reply. I am using Dataset API and its a batch job so wondering is the option you provided is works for that.
Thanks, Anuj On Wed, Jan 8, 2020 at 7:01 PM Arvid Heise <ar...@ververica.com> wrote: > Hi Anji, > > StreamingFileSink has a BucketAssigner that you can use for that purpose. > > From the javadoc: The sink uses a BucketAssigner to determine in which > bucket directory each element should be written to inside the base > directory. The BucketAssigner can, for example, use time or a property of > the element to determine the bucket directory. The default BucketAssigner > is a DateTimeBucketAssigner which will create one new bucket every hour. > You can specify a custom BucketAssigner using the > setBucketAssigner(bucketAssigner) method, after calling forRowFormat(Path, > Encoder) or forBulkFormat(Path, BulkWriter.Factory). > > If that doesn't work for you, please let me know. Btw, are you using event > or processing time? > > Best, > > Arvid > > On Fri, Dec 27, 2019 at 4:24 AM vino yang <yanghua1...@gmail.com> wrote: > >> Hi Anji, >> >> Actually, I am not familiar with how to partition via timestamp. Flink's >> streaming BucketingSink provides this feature.[1] You may refer to this >> link and customize your sink. >> >> I can ping a professional committer who knows more detail of FS connector >> than me, @kklou...@gmail.com <kklou...@gmail.com> may give you help. >> >> Best, >> Vino >> >> [1]: >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/filesystem_sink.html#bucketing-file-sink >> >> aj <ajainje...@gmail.com> 于2019年12月27日周五 上午1:51写道: >> >>> Thanks Vino. >>> >>> I am able to write data in parquet now. But now the issue is how to >>> write a dataset to multiple output path as per timestamp partition. >>> I want to partition data on date wise. >>> >>> I am writing like this currently that will write to single output path. >>> >>> DataSet<Tuple2<Void,GenericRecord>> df = allEvents.flatMap(new >>> EventMapProcessor(schema.toString())).withParameters(configuration); >>> >>> Job job = Job.getInstance(); >>> AvroParquetOutputFormat.setSchema(job, book_bike.getClassSchema()); >>> HadoopOutputFormat parquetFormat = new HadoopOutputFormat<Void, >>> GenericRecord>(new AvroParquetOutputFormat(), job); >>> FileOutputFormat.setOutputPath(job, new Path(outputDirectory)); >>> >>> df.output(parquetFormat); >>> env.execute(); >>> >>> >>> Please suggest. >>> >>> Thanks, >>> Anuj >>> >>> On Mon, Dec 23, 2019 at 12:59 PM vino yang <yanghua1...@gmail.com> >>> wrote: >>> >>>> Hi Anuj, >>>> >>>> After searching in Github, I found a demo repository about how to use >>>> parquet in Flink.[1] >>>> >>>> You can have a look. I can not make sure whether it is helpful or not. >>>> >>>> [1]: https://github.com/FelixNeutatz/parquet-flinktacular >>>> >>>> Best, >>>> Vino >>>> >>>> aj <ajainje...@gmail.com> 于2019年12月21日周六 下午7:03写道: >>>> >>>>> Hello All, >>>>> >>>>> I am getting a set of events in JSON that I am dumping in the hourly >>>>> bucket in S3. >>>>> I am reading this hourly bucket and created a DataSet<String>. >>>>> >>>>> I want to write this dataset as a parquet but I am not able to figure >>>>> out. Can somebody help me with this? >>>>> >>>>> >>>>> Thanks, >>>>> Anuj >>>>> >>>>> >>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>>>> >>>> >>> >>> -- >>> Thanks & Regards, >>> Anuj Jain >>> Mob. : +91- 8588817877 >>> Skype : anuj.jain07 >>> <http://www.oracle.com/> >>> >>> >>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>> >> -- Thanks & Regards, Anuj Jain Mob. : +91- 8588817877 Skype : anuj.jain07 <http://www.oracle.com/> <http://www.cse.iitm.ac.in/%7Eanujjain/>