Hi Radu,

It is indeed interesting to know how each window could be registered separately 
-  I am not sure it any of the existing mechanisms in Flink support this.
I think you need to create your own output sink. It is a bit tricky to pass the 
window sequence number (actually I do  not think such an index is kept – but 
you can create one by yourself). Maybe an easier option is to manage the 
writing of the data yourself in the window function or in a custom created 
evictor. In the window and in the evictor you have access to all data and you 
can create specific files for each window triggered



From: Radu Prodan [mailto:raduprod...@gmail.com]
Sent: Thursday, February 04, 2016 11:58 AM
To: user@flink.apache.org
Subject: Re: Flink writeAsCsv

Hi Marton,

Thanks to your comment I managed to get it worked. At least it outputs the 
results. However, what I need is to output each window result seperately.  Now, 
it outputs the results of parallel working windows (I think) and appends the 
new results to them. For example, If I have parallelism of 10, then I will have 
at most 10 files and each file will grow in size as windows continue.
What I want is, to have seperate file for a window. For example, after n'th 
window is computed output it to some file and close the file.

-best
Radu

On Thu, Feb 4, 2016 at 11:42 AM Márton Balassi 
<balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com>> wrote:
Hey Radu,

As you are using the streaming api I assume that you call env.execute() in both 
cases. Is that the case?

Do you see any errors appearing? My first call would be if your data type is 
not a tuple type then writeAsCsv does not work by default.

Best,

Marton

On Thu, Feb 4, 2016 at 11:36 AM, Radu Prodan 
<raduprod...@gmail.com<mailto:raduprod...@gmail.com>> wrote:
Hi all,

I am new to flink. I wrote a simple program and I want it to output as csv file.


timeWindowAll(Time.of(3, TimeUnit.MINUTES))

.apply(newFunction1())

.writeAsCsv("file:///user/someuser/Documents/somefile.csv<file:///\\user\someuser\Documents\somefile.csv>");



When I change the sink to . print(), it works and outputs some results.

I want it to output the result of every window. However, it outputs nothing and 
the file is not created. Am I missing anything?



-best

Radu







Reply via email to