To add to it, my pipeline is a simple

keyBy(0)
        .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
        .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
        .reduce(new ReduceFunction(), new WindowFunction())


On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> Hello folks,
>
> As far as I know checkpoint failure should be ignored and retried with
> potentially larger state. I had this situation
>
> * hdfs went into a safe mode b'coz of Name Node issues
> * exception was thrown
>
>     
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
> Operation category WRITE is not supported in state standby. Visit
> https://s.apache.org/sbnn-error
>     ..................
>
>     at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(Had
> oopFileSystem.java:453)
>         at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(S
> afetyNetWrapperFileSystem.java:111)
>         at org.apache.flink.runtime.state.filesystem.FsCheckpointStream
> Factory.createBasePath(FsCheckpointStreamFactory.java:132)
>
> * The pipeline came back after a few restarts and checkpoint failures,
> after the hdfs issues were resolved.
>
> I would not have worried about the restart, but it was evident that I lost
> my operator state. Either it was my kafka consumer that kept on advancing
> it's offset between a start and the next checkpoint failure ( a minute's
> worth ) or the the operator that had partial aggregates was lost. I have a
> 15 minute window of counts on a keyed operator
>
> I am using ROCKS DB and of course have checkpointing turned on.
>
> The questions thus are
>
> * Should a pipeline be restarted if checkpoint fails ?
> * Why on restart did the operator state did not recreate ?
> * Is the nature of the exception thrown have to do with any of this b'coz
> suspend and resume from a save point work as expected ?
> * And though I am pretty sure, are operators like the Window operator
> stateful by drfault and thus if I have timeWindow(Time.of(window_size,
> TimeUnit.MINUTES)).reduce(new ReduceFunction(), new WindowFunction()), the
> state is managed by flink ?
>
> Thanks.
>

Reply via email to