You're clearing the "handState" on "GameEndHistory". I'm assuming this event comes in before "CommCardHistory" where you check the state.
On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <kim.s...@gmail.com> wrote: > in my code, is the config of ExecutionEnv alright? > > >> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <kim.s...@gmail.com> wrote: >> >> >> my code and log is as below. >> >> >> val getExecuteEnv: StreamExecutionEnvironment = { >> val env = >> StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000) >> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) >> >> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE) >> env.getCheckpointConfig.setCheckpointTimeout(60000) >> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) >> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 30000)) >> env >> } >> >> def transform(target: DataStream[(String, String, String, String, Long)]): >> DataStream[WinLossBase] = >> target.keyBy(_._3).flatMap(new StateOperator) >> >> def main(args: Array[String]) { >> val env = getExecuteEnv >> val source: DataStream[String] = >> extractFromKafka(env).name("KafkaSource") >> val json = deserializeToJsonObj(source).name("ConvertToJson") >> val target: DataStream[(String, String, String, String, Long)] = >> preTransform(json) >> val result: DataStream[WinLossBase] = >> transform(target).name("ToKeyedStream”) >> … >> } >> >> class StateOperator extends RichFlatMapFunction[(String, String, String, >> String, Long), WinLossBase] { >> var playerState: ValueState[util.Map[String, PotPlayer]] = _ >> var handState: ValueState[HandHistoryInfo] = _ >> >> override def open(param: Configuration): Unit = { >> val playerValueStateDescriptor = new >> ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss", >> classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String, >> PotPlayer]()) >> playerState = >> getRuntimeContext.getState(playerValueStateDescriptor) >> handState = getRuntimeContext.getState(new >> ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null)) >> } >> >> override def flatMap(in: (String, String, String, String, Long), out: >> Collector[WinLossBase]): Unit = { >> in._2 match { >> case "GameStartHistory" => >> val players = playerState.value() >> val obj = _convertJsonToRecord(in._4, >> classOf[GameStartHistoryRecord]) >> val record = obj.asInstanceOf[GameStartHistoryRecord] >> val handHistoryInfo: HandHistoryInfo = >> _setUpHandHistoryInfo(record) >> if (LOG.isInfoEnabled()) >> LOG.info("hand start {}", if (handHistoryInfo != >> null) handHistoryInfo.handHistoryId else "NULL”) >> …. >> playerState.update(players) >> handState.update(handHistoryInfo) >> case "HoleCardHistory" => >> val players = playerState.value() >> if (players != null) { >> ... >> playerState.update(players) >> } else LOG.warn("there is no player[hole card]. {}", >> in._4) >> case "PlayerStateHistory" => >> val players = playerState.value() >> if (players != null) { >> …. >> playerState.update(players) >> } else LOG.warn("there is no player[player state]. {}", >> in._4) >> case "CommCardHistory" => >> val handHistoryInfo = handState.value() >> val commCardHistory: CommCardHistory = >> commCardState.value() >> if (handHistoryInfo != null) { >> ... >> handState.update(handHistoryInfo) >> commCardState.update(commCardHistory) >> } else LOG.warn("there is no handhistory info[comm card]. >> {}", in._4) >> case "PlayerActionHistory" => >> val handHistoryInfo = handState.value() >> val players = playerState.value() >> >> if (handHistoryInfo != null) { >> ... >> } else LOG.warn("there is no handhistory info[player >> action]. {}", in._4) >> case "PotHistory" => >> val players = playerState.value() >> val handHistoryInfo = handState.value() >> val commCardHistory: CommCardHistory = >> commCardState.value() >> if (handHistoryInfo != null && handHistoryInfo.playType >> == PlayType.Cash && players != null && players.size > 1) { >> ... >> } else LOG.warn("there is no handhistory info[pot]. {}", >> in._4) >> case "GameEndHistory" => >> val players = playerState.value() >> val handHistoryInfo = handState.value() >> ... >> if (LOG.isTraceEnabled()) LOG.trace("end {}", >> record.getHandHistoryId) >> playerState.clear() >> handState.clear() >> case _ => >> } >> } >> >> —— log —— >> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, >> Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) >> (3/4)] INFO com.nsuslab.denma.stream.winloss.flow.Main$ - hand start >> 5769392597641628595 >> >> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, >> Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) >> (3/4)] WARN com.nsuslab.denma.stream.winloss.flow.Main$ - there is no >> handhistory info[pot]. >> >>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <u...@apache.org> wrote: >>> >>> What do you mean with lost exactly? >>> >>> You call value() and it returns a value (!= null/defaultValue) and you >>> call it again and it returns null/defaultValue for the same key with >>> no update in between? >>> >>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas >>> <k.klou...@data-artisans.com> wrote: >>>> Hello, >>>> >>>> Could you share the code of the job you are running? >>>> With only this information I am afraid we cannot help much. >>>> >>>> Thanks, >>>> Kostas >>>> >>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <kim.s...@gmail.com> wrote: >>>>> >>>>> Hi. >>>>> I’m using flink 1.0.3 on aws EMR. >>>>> sporadically value of ValueState is lost. >>>>> what is starting point for solving this problem. >>>>> Thank you. >>>> >> >