Hi Hawin! If you are creating code for such an output into different files/partitions, it would be amazing if you could contribute this code to Flink.
It seems like a very common use case, so this functionality will be useful to other user as well! Greetings, Stephan On Tue, Jun 23, 2015 at 3:36 PM, Márton Balassi <balassi.mar...@gmail.com> wrote: > Dear Hawin, > > We do not have out of the box support for that, it is something you would > need to implement yourself in a custom SinkFunction. > > Best, > > Marton > > On Mon, Jun 22, 2015 at 11:51 PM, Hawin Jiang <hawin.ji...@gmail.com> > wrote: > >> Hi Marton >> >> if we received a huge data from kafka and wrote to HDFS immediately. We >> should use buffer timeout based on your URL >> I am not sure you have flume experience. Flume can be configured buffer >> size and partition as well. >> >> What is the partition. >> For example: >> I want to write 1 minute buffer file to HDFS which is >> /data/flink/year=2015/month=06/day=22/hour=21. >> if the partition(/data/flink/year=2015/month=06/day=22/hour=21) is there, >> no need to create it. Otherwise, flume will create it automatically. >> Flume knows the coming data will come to right partition. >> >> I am not sure Flink also provided a similar partition API or >> configuration for this. >> Thanks. >> >> >> >> Best regards >> Hawin >> >> On Wed, Jun 10, 2015 at 10:31 AM, Hawin Jiang <hawin.ji...@gmail.com> >> wrote: >> >>> Thanks Marton >>> I will use this code to implement my testing. >>> >>> >>> >>> Best regards >>> Hawin >>> >>> On Wed, Jun 10, 2015 at 1:30 AM, Márton Balassi < >>> balassi.mar...@gmail.com> wrote: >>> >>>> Dear Hawin, >>>> >>>> You can pass a hdfs path to DataStream's and DataSet's writeAsText and >>>> writeAsCsv methods. >>>> I assume that you are running a Streaming topology, because your source >>>> is Kafka, so it would look like the following: >>>> >>>> StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> >>>> env.addSource(PerisitentKafkaSource(..)) >>>> .map(/* do you operations*/) >>>> >>>> .wirteAsText("hdfs://<namenode_name>:<namenode_port>/path/to/your/file"); >>>> >>>> Check out the relevant section of the streaming docs for more info. [1] >>>> >>>> [1] >>>> http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#connecting-to-the-outside-world >>>> >>>> Best, >>>> >>>> Marton >>>> >>>> On Wed, Jun 10, 2015 at 10:22 AM, Hawin Jiang <hawin.ji...@gmail.com> >>>> wrote: >>>> >>>>> Hi All >>>>> >>>>> >>>>> >>>>> Can someone tell me what is the best way to write data to HDFS when >>>>> Flink received data from Kafka? >>>>> >>>>> Big thanks for your example. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> Best regards >>>>> >>>>> Hawin >>>>> >>>>> >>>>> >>>> >>>> >>> >> >