Hi Andra,

sure, if you do the logging for each record (or group of records) using a
list accumulator is a very bad idea.

If you don't need exact stats for each vertex but rather a distribution
over all vertices, you can use a histogram accumulator.
If you need exact vertex stats, I'd go with Vasia's proposal.

2015-06-29 22:48 GMT+02:00 Vasiliki Kalavri <vasilikikala...@gmail.com>:

> Andra,
>
> why don't you simply print to standard output and gather your metrics from
> the taskmanagers' log files after execution?
> Wouldn't that work for you?
>
> -V.
>
> On 29 June 2015 at 22:36, Andra Lungu <lungu.an...@gmail.com> wrote:
>
> > Caution! I am getting philosophical. Stop me if I'm talking nonsense!
> >
> > You are suggesting a list that will have one or two entries per vertex =
> > (approx) billions. Won't this over-saturate my memory? I am already
> filling
> > it with lots of junk resulted from the computation...
> >
> > On Mon, Jun 29, 2015 at 1:58 PM, Fabian Hueske <fhue...@gmail.com>
> wrote:
> >
> > > Have you tried to use a custom accumulator that just appends to a list?
> > >
> > > 2015-06-29 12:59 GMT+02:00 Andra Lungu <lungu.an...@gmail.com>:
> > >
> > > > Hey Fabian,
> > > >
> > > > I am aware of the way open, preSuperstep(), postSuperstep() etc can
> > help
> > > me
> > > > within an interation, unfortunately I am writing my own method here.
> I
> > > > could try to briefly describe it:
> > > >
> > > > public static final class PropagateNeighborValues implements
> > > > NeighborsFunctionWithVertexValue(...) {
> > > >     @Override
> > > >     public void iterateNeighbors(Iterable..., Collector...) {
> > > >              while(iterator.hasNext) neighbors++;
> > > >              // and I would need something like
> > > >              appendToFile(myAwesomeFile, neighbors);
> > > >     }
> > > > }
> > > >
> > > > Open() and synchronised are definitely not doing the trick for me
> right
> > > > now.
> > > > Any other way !? :(
> > > >
> > > > On Mon, Jun 29, 2015 at 11:36 AM, Fabian Hueske <fhue...@gmail.com>
> > > wrote:
> > > >
> > > > > You can measure the time of each iteration in the open() methods
> > > > operators
> > > > > within an iteration. open() will be called before each iteration.
> > > > > The times can be collected by either printing to std out (you need
> to
> > > > > collect the files then...) or by implementing a list accumulator.
> > Each
> > > > time
> > > > > should include the iteration# und parallel task id.
> > > > > After the execution, the acculuator will be available in the
> > execution
> > > > > result.
> > > > >
> > > > > Accumulators can of course also be used to collect number of
> > messages,
> > > > etc.
> > > > >
> > > > > Best, Fabian
> > > > >
> > > > > 2015-06-29 9:55 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it
> >:
> > > > >
> > > > > > Why don't you use Flink dataset output functions (like
> writeAsText,
> > > > > > writeAsCsv, etc..)?
> > > > > > Or if they are not sufficient you can implement/override your own
> > > > > > InputFormat.
> > > > > >
> > > > > > From what is my experience static variables are evil in
> distributed
> > > > > > environments..
> > > > > > Moreover, one of the main strengths of Flink are its input/output
> > > APIs
> > > > > so I
> > > > > > would avoid to write to a file in that way.
> > > > > >
> > > > > > Of course, dataset.append() will be a very convenient API to add
> > > > (IMHO).
> > > > > >
> > > > > > Best,
> > > > > > Flavio
> > > > > >
> > > > > >
> > > > > > On Sat, Jun 27, 2015 at 4:19 PM, Andra Lungu <
> > lungu.an...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hey guys,
> > > > > > >
> > > > > > > Me again :) So now that my wonderful job finishes, I would like
> > to
> > > > > > monitor
> > > > > > > it a bit (i.e. build some charts on the number of messages per
> > > > vertex,
> > > > > > > compute the total amount of time elapsed per computation per
> > > vertex,
> > > > > > etc).
> > > > > > >
> > > > > > > The main computational-intensive operation is a coGroup. There,
> > > > within
> > > > > > the
> > > > > > > iteration I count the number of "messages" sent and then I do
> > > simple:
> > > > > > >
> > > > > > > Files.append(messages, messagesTempFile, Charsets.UTF_8);
> > > > > > >
> > > > > > > The problem is that with this approach, I get a deadlock (yes!!
> > Now
> > > > > that
> > > > > > I
> > > > > > > know the code itself works I am positive that the deadlock
> comes
> > > from
> > > > > the
> > > > > > > append -this regarding my previous mail-). It is normal if you
> > come
> > > > to
> > > > > > > think of it 200 something threads are trying to write to the
> same
> > > > > file...
> > > > > > >
> > > > > > > A possible workaround is this one:
> > > > > > >
> > > > > > > public class Singleton {
> > > > > > >     private static final Singleton inst= new Singleton();
> > > > > > >
> > > > > > >     private Singleton() {
> > > > > > >         super();
> > > > > > >     }
> > > > > > >
> > > > > > >     public synchronized void writeToFile(String str) {
> > > > > > >         // Do whatever
> > > > > > >     }
> > > > > > >
> > > > > > >     public Singleton getInstance() {
> > > > > > >         return inst;
> > > > > > >     }
> > > > > > > }
> > > > > > >
> > > > > > > Singleton.getInstance().writeToFile("Hello!!");
> > > > > > >
> > > > > > > However, I am not sure how well Flink plays with
> synchronised....
> > > > > > >
> > > > > > > Is there a smarter way to do it?
> > > > > > >
> > > > > > > Thanks!
> > > > > > >
> > > > > > > Andra
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to