Hi! So far we are not aware of a state loss bug in Flink. My guess is that it is some subtlety in the program.
The check that logs also has other checks, like "handHistoryInfo.playType == PlayType.Cash" and "players.size > 1". Is one of them maybe the problem? To debug this, you can try and do the following: Rather than using Flink's key/value state, simply use your own java/scala map in the RichFlatMapFunction. That is not by default fault-tolerant, but you can use that to see if the error occurs in the same way or not. Greetings, Stephan On Fri, Aug 12, 2016 at 12:46 PM, Dong-iL, Kim <kim.s...@gmail.com> wrote: > Hi. > I checked order of data. but it is alright. > Is there any other possibilities? > Thank you. > > On Aug 12, 2016, at 7:09 PM, Stephan Ewen <se...@apache.org> wrote: > > Hi! > > Its not that easy to say at a first glance. > > One thing that is important to bear in mind is what ordering guarantees > Flink gives, and where the ordering guarantees are not given. > When you use keyBy() or redistribute(), order is preserved per parallel > source/target pair only. > > Have a look here: > https://ci.apache.org/projects/flink/flink-docs- > master/concepts/concepts.html#parallel-dataflows > > > Could it be that the events simply arrive in a different order in the > functions, so that a later event that looks for state comes before an > earlier event that creates the state? > > Greetings, > Stephan > > On Fri, Aug 12, 2016 at 12:04 PM, Dong-iL, Kim <kim.s...@gmail.com> wrote: > >> Nope. >> I added log in End. >> but there is same log. >> is there any fault in my code? >> >> thank you. >> >> >> > On Aug 12, 2016, at 6:42 PM, Maximilian Michels <m...@apache.org> wrote: >> > >> > You're clearing the "handState" on "GameEndHistory". I'm assuming this >> > event comes in before "CommCardHistory" where you check the state. >> > >> > On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <kim.s...@gmail.com> >> wrote: >> >> in my code, is the config of ExecutionEnv alright? >> >> >> >> >> >>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <kim.s...@gmail.com> wrote: >> >>> >> >>> >> >>> my code and log is as below. >> >>> >> >>> >> >>> val getExecuteEnv: StreamExecutionEnvironment = { >> >>> val env = StreamExecutionEnvironment.get >> ExecutionEnvironment.enableCheckpointing(10000) >> >>> env.setStreamTimeCharacteristic(TimeCharacteristic.Ingestio >> nTime) >> >>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingM >> ode.AT_LEAST_ONCE) >> >>> env.getCheckpointConfig.setCheckpointTimeout(60000) >> >>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) >> >>> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, >> 30000)) >> >>> env >> >>> } >> >>> >> >>> def transform(target: DataStream[(String, String, String, String, >> Long)]): DataStream[WinLossBase] = >> >>> target.keyBy(_._3).flatMap(new StateOperator) >> >>> >> >>> def main(args: Array[String]) { >> >>> val env = getExecuteEnv >> >>> val source: DataStream[String] = extractFromKafka(env).name("Ka >> fkaSource") >> >>> val json = deserializeToJsonObj(source).name("ConvertToJson") >> >>> val target: DataStream[(String, String, String, String, Long)] >> = preTransform(json) >> >>> val result: DataStream[WinLossBase] = >> transform(target).name("ToKeyedStream”) >> >>> … >> >>> } >> >>> >> >>> class StateOperator extends RichFlatMapFunction[(String, String, >> String, String, Long), WinLossBase] { >> >>> var playerState: ValueState[util.Map[String, PotPlayer]] = _ >> >>> var handState: ValueState[HandHistoryInfo] = _ >> >>> >> >>> override def open(param: Configuration): Unit = { >> >>> val playerValueStateDescriptor = new >> ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss", >> >>> classOf[util.Map[String, PotPlayer]], >> Maps.newHashMap[String, PotPlayer]()) >> >>> playerState = getRuntimeContext.getState(pla >> yerValueStateDescriptor) >> >>> handState = getRuntimeContext.getState(new >> ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null)) >> >>> } >> >>> >> >>> override def flatMap(in: (String, String, String, String, >> Long), out: Collector[WinLossBase]): Unit = { >> >>> in._2 match { >> >>> case "GameStartHistory" => >> >>> val players = playerState.value() >> >>> val obj = _convertJsonToRecord(in._4, >> classOf[GameStartHistoryRecord]) >> >>> val record = obj.asInstanceOf[GameStartHist >> oryRecord] >> >>> val handHistoryInfo: HandHistoryInfo = >> _setUpHandHistoryInfo(record) >> >>> if (LOG.isInfoEnabled()) >> >>> LOG.info <http://log.info>("hand start {}", if >> (handHistoryInfo != null) handHistoryInfo.handHistoryId else "NULL”) >> >>> …. >> >>> playerState.update(players) >> >>> handState.update(handHistoryInfo) >> >>> case "HoleCardHistory" => >> >>> val players = playerState.value() >> >>> if (players != null) { >> >>> ... >> >>> playerState.update(players) >> >>> } else LOG.warn("there is no player[hole card]. >> {}", in._4) >> >>> case "PlayerStateHistory" => >> >>> val players = playerState.value() >> >>> if (players != null) { >> >>> …. >> >>> playerState.update(players) >> >>> } else LOG.warn("there is no player[player state]. >> {}", in._4) >> >>> case "CommCardHistory" => >> >>> val handHistoryInfo = handState.value() >> >>> val commCardHistory: CommCardHistory = >> commCardState.value() >> >>> if (handHistoryInfo != null) { >> >>> ... >> >>> handState.update(handHistoryInfo) >> >>> commCardState.update(commCardHistory) >> >>> } else LOG.warn("there is no handhistory info[comm >> card]. {}", in._4) >> >>> case "PlayerActionHistory" => >> >>> val handHistoryInfo = handState.value() >> >>> val players = playerState.value() >> >>> >> >>> if (handHistoryInfo != null) { >> >>> ... >> >>> } else LOG.warn("there is no handhistory >> info[player action]. {}", in._4) >> >>> case "PotHistory" => >> >>> val players = playerState.value() >> >>> val handHistoryInfo = handState.value() >> >>> val commCardHistory: CommCardHistory = >> commCardState.value() >> >>> if (handHistoryInfo != null && >> handHistoryInfo.playType == PlayType.Cash && players != null && >> players.size > 1) { >> >>> ... >> >>> } else LOG.warn("there is no handhistory info[pot]. >> {}", in._4) >> >>> case "GameEndHistory" => >> >>> val players = playerState.value() >> >>> val handHistoryInfo = handState.value() >> >>> ... >> >>> if (LOG.isTraceEnabled()) LOG.trace("end {}", >> record.getHandHistoryId) >> >>> playerState.clear() >> >>> handState.clear() >> >>> case _ => >> >>> } >> >>> } >> >>> >> >>> —— log —— >> >>> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, >> Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to >> HBase) (3/4)] INFO com.nsuslab.denma.stream.winloss.flow.Main$ - hand >> start 5769392597641628595 >> >>> >> >>> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, >> Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to >> HBase) (3/4)] WARN com.nsuslab.denma.stream.winloss.flow.Main$ - there >> is no handhistory info[pot]. >> >>> >> >>>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <u...@apache.org> wrote: >> >>>> >> >>>> What do you mean with lost exactly? >> >>>> >> >>>> You call value() and it returns a value (!= null/defaultValue) and >> you >> >>>> call it again and it returns null/defaultValue for the same key with >> >>>> no update in between? >> >>>> >> >>>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas >> >>>> <k.klou...@data-artisans.com> wrote: >> >>>>> Hello, >> >>>>> >> >>>>> Could you share the code of the job you are running? >> >>>>> With only this information I am afraid we cannot help much. >> >>>>> >> >>>>> Thanks, >> >>>>> Kostas >> >>>>> >> >>>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <kim.s...@gmail.com> >> wrote: >> >>>>>> >> >>>>>> Hi. >> >>>>>> I’m using flink 1.0.3 on aws EMR. >> >>>>>> sporadically value of ValueState is lost. >> >>>>>> what is starting point for solving this problem. >> >>>>>> Thank you. >> >>>>> >> >>> >> >> >> >> > >