Hi Arvid,

Thanks for the details reply. I am using Dataset API and its a batch job so
wondering is the option you provided is works for that.

Thanks,
Anuj

On Wed, Jan 8, 2020 at 7:01 PM Arvid Heise <ar...@ververica.com> wrote:

> Hi Anji,
>
> StreamingFileSink has a BucketAssigner that you can use for that purpose.
>
> From the javadoc: The sink uses a BucketAssigner to determine in which
> bucket directory each element should be written to inside the base
> directory. The BucketAssigner can, for example, use time or a property of
> the element to determine the bucket directory. The default BucketAssigner
> is a DateTimeBucketAssigner which will create one new bucket every hour.
> You can specify a custom BucketAssigner using the
> setBucketAssigner(bucketAssigner) method, after calling forRowFormat(Path,
> Encoder) or forBulkFormat(Path, BulkWriter.Factory).
>
> If that doesn't work for you, please let me know. Btw, are you using event
> or processing time?
>
> Best,
>
> Arvid
>
> On Fri, Dec 27, 2019 at 4:24 AM vino yang <yanghua1...@gmail.com> wrote:
>
>> Hi Anji,
>>
>> Actually, I am not familiar with how to partition via timestamp. Flink's
>> streaming BucketingSink provides this feature.[1] You may refer to this
>> link and customize your sink.
>>
>> I can ping a professional committer who knows more detail of FS connector
>> than me, @kklou...@gmail.com <kklou...@gmail.com> may give you help.
>>
>> Best,
>> Vino
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/filesystem_sink.html#bucketing-file-sink
>>
>> aj <ajainje...@gmail.com> 于2019年12月27日周五 上午1:51写道:
>>
>>> Thanks Vino.
>>>
>>> I am able to write data in parquet now. But now the issue is how to
>>> write a dataset to multiple output path as per timestamp partition.
>>> I want to partition data on date wise.
>>>
>>> I am writing like this currently that will write to single output path.
>>>
>>> DataSet<Tuple2<Void,GenericRecord>> df = allEvents.flatMap(new 
>>> EventMapProcessor(schema.toString())).withParameters(configuration);
>>>
>>> Job job = Job.getInstance();
>>> AvroParquetOutputFormat.setSchema(job, book_bike.getClassSchema());
>>> HadoopOutputFormat parquetFormat = new HadoopOutputFormat<Void, 
>>> GenericRecord>(new AvroParquetOutputFormat(), job);
>>> FileOutputFormat.setOutputPath(job, new Path(outputDirectory));
>>>
>>> df.output(parquetFormat);
>>> env.execute();
>>>
>>>
>>> Please suggest.
>>>
>>> Thanks,
>>> Anuj
>>>
>>> On Mon, Dec 23, 2019 at 12:59 PM vino yang <yanghua1...@gmail.com>
>>> wrote:
>>>
>>>> Hi Anuj,
>>>>
>>>> After searching in Github, I found a demo repository about how to use
>>>> parquet in Flink.[1]
>>>>
>>>> You can have a look. I can not make sure whether it is helpful or not.
>>>>
>>>> [1]: https://github.com/FelixNeutatz/parquet-flinktacular
>>>>
>>>> Best,
>>>> Vino
>>>>
>>>> aj <ajainje...@gmail.com> 于2019年12月21日周六 下午7:03写道:
>>>>
>>>>> Hello All,
>>>>>
>>>>> I am getting a set of events in JSON that I am dumping in the hourly
>>>>> bucket in S3.
>>>>> I am reading this hourly bucket and created a DataSet<String>.
>>>>>
>>>>> I want to write this dataset as a parquet but I am not able to figure
>>>>> out. Can somebody help me with this?
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Anuj
>>>>>
>>>>>
>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>
>>>>
>>>
>>> --
>>> Thanks & Regards,
>>> Anuj Jain
>>> Mob. : +91- 8588817877
>>> Skype : anuj.jain07
>>> <http://www.oracle.com/>
>>>
>>>
>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>
>>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to