To add to it, my pipeline is a simple keyBy(0) .timeWindow(Time.of(window_size, TimeUnit.MINUTES)) .allowedLateness(Time.of(late_by, TimeUnit.SECONDS)) .reduce(new ReduceFunction(), new WindowFunction())
On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi <vishal.santo...@gmail.com> wrote: > Hello folks, > > As far as I know checkpoint failure should be ignored and retried with > potentially larger state. I had this situation > > * hdfs went into a safe mode b'coz of Name Node issues > * exception was thrown > > > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): > Operation category WRITE is not supported in state standby. Visit > https://s.apache.org/sbnn-error > .................. > > at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(Had > oopFileSystem.java:453) > at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(S > afetyNetWrapperFileSystem.java:111) > at org.apache.flink.runtime.state.filesystem.FsCheckpointStream > Factory.createBasePath(FsCheckpointStreamFactory.java:132) > > * The pipeline came back after a few restarts and checkpoint failures, > after the hdfs issues were resolved. > > I would not have worried about the restart, but it was evident that I lost > my operator state. Either it was my kafka consumer that kept on advancing > it's offset between a start and the next checkpoint failure ( a minute's > worth ) or the the operator that had partial aggregates was lost. I have a > 15 minute window of counts on a keyed operator > > I am using ROCKS DB and of course have checkpointing turned on. > > The questions thus are > > * Should a pipeline be restarted if checkpoint fails ? > * Why on restart did the operator state did not recreate ? > * Is the nature of the exception thrown have to do with any of this b'coz > suspend and resume from a save point work as expected ? > * And though I am pretty sure, are operators like the Window operator > stateful by drfault and thus if I have timeWindow(Time.of(window_size, > TimeUnit.MINUTES)).reduce(new ReduceFunction(), new WindowFunction()), the > state is managed by flink ? > > Thanks. >