You can measure the time of each iteration in the open() methods operators
within an iteration. open() will be called before each iteration.
The times can be collected by either printing to std out (you need to
collect the files then...) or by implementing a list accumulator. Each time
should include the iteration# und parallel task id.
After the execution, the acculuator will be available in the execution
result.

Accumulators can of course also be used to collect number of messages, etc.

Best, Fabian

2015-06-29 9:55 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:

> Why don't you use Flink dataset output functions (like writeAsText,
> writeAsCsv, etc..)?
> Or if they are not sufficient you can implement/override your own
> InputFormat.
>
> From what is my experience static variables are evil in distributed
> environments..
> Moreover, one of the main strengths of Flink are its input/output APIs so I
> would avoid to write to a file in that way.
>
> Of course, dataset.append() will be a very convenient API to add (IMHO).
>
> Best,
> Flavio
>
>
> On Sat, Jun 27, 2015 at 4:19 PM, Andra Lungu <lungu.an...@gmail.com>
> wrote:
>
> > Hey guys,
> >
> > Me again :) So now that my wonderful job finishes, I would like to
> monitor
> > it a bit (i.e. build some charts on the number of messages per vertex,
> > compute the total amount of time elapsed per computation per vertex,
> etc).
> >
> > The main computational-intensive operation is a coGroup. There, within
> the
> > iteration I count the number of "messages" sent and then I do simple:
> >
> > Files.append(messages, messagesTempFile, Charsets.UTF_8);
> >
> > The problem is that with this approach, I get a deadlock (yes!! Now that
> I
> > know the code itself works I am positive that the deadlock comes from the
> > append -this regarding my previous mail-). It is normal if you come to
> > think of it 200 something threads are trying to write to the same file...
> >
> > A possible workaround is this one:
> >
> > public class Singleton {
> >     private static final Singleton inst= new Singleton();
> >
> >     private Singleton() {
> >         super();
> >     }
> >
> >     public synchronized void writeToFile(String str) {
> >         // Do whatever
> >     }
> >
> >     public Singleton getInstance() {
> >         return inst;
> >     }
> > }
> >
> > Singleton.getInstance().writeToFile("Hello!!");
> >
> > However, I am not sure how well Flink plays with synchronised....
> >
> > Is there a smarter way to do it?
> >
> > Thanks!
> >
> > Andra
> >
>

Reply via email to