my code and log is as below.
val getExecuteEnv: StreamExecutionEnvironment = { val env = StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000) env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE) env.getCheckpointConfig.setCheckpointTimeout(60000) env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 30000)) env } def transform(target: DataStream[(String, String, String, String, Long)]): DataStream[WinLossBase] = target.keyBy(_._3).flatMap(new StateOperator) def main(args: Array[String]) { val env = getExecuteEnv val source: DataStream[String] = extractFromKafka(env).name("KafkaSource") val json = deserializeToJsonObj(source).name("ConvertToJson") val target: DataStream[(String, String, String, String, Long)] = preTransform(json) val result: DataStream[WinLossBase] = transform(target).name("ToKeyedStream”) … } class StateOperator extends RichFlatMapFunction[(String, String, String, String, Long), WinLossBase] { var playerState: ValueState[util.Map[String, PotPlayer]] = _ var handState: ValueState[HandHistoryInfo] = _ override def open(param: Configuration): Unit = { val playerValueStateDescriptor = new ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss", classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String, PotPlayer]()) playerState = getRuntimeContext.getState(playerValueStateDescriptor) handState = getRuntimeContext.getState(new ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null)) } override def flatMap(in: (String, String, String, String, Long), out: Collector[WinLossBase]): Unit = { in._2 match { case "GameStartHistory" => val players = playerState.value() val obj = _convertJsonToRecord(in._4, classOf[GameStartHistoryRecord]) val record = obj.asInstanceOf[GameStartHistoryRecord] val handHistoryInfo: HandHistoryInfo = _setUpHandHistoryInfo(record) if (LOG.isInfoEnabled()) LOG.info("hand start {}", if (handHistoryInfo != null) handHistoryInfo.handHistoryId else "NULL”) …. playerState.update(players) handState.update(handHistoryInfo) case "HoleCardHistory" => val players = playerState.value() if (players != null) { ... playerState.update(players) } else LOG.warn("there is no player[hole card]. {}", in._4) case "PlayerStateHistory" => val players = playerState.value() if (players != null) { …. playerState.update(players) } else LOG.warn("there is no player[player state]. {}", in._4) case "CommCardHistory" => val handHistoryInfo = handState.value() val commCardHistory: CommCardHistory = commCardState.value() if (handHistoryInfo != null) { ... handState.update(handHistoryInfo) commCardState.update(commCardHistory) } else LOG.warn("there is no handhistory info[comm card]. {}", in._4) case "PlayerActionHistory" => val handHistoryInfo = handState.value() val players = playerState.value() if (handHistoryInfo != null) { ... } else LOG.warn("there is no handhistory info[player action]. {}", in._4) case "PotHistory" => val players = playerState.value() val handHistoryInfo = handState.value() val commCardHistory: CommCardHistory = commCardState.value() if (handHistoryInfo != null && handHistoryInfo.playType == PlayType.Cash && players != null && players.size > 1) { ... } else LOG.warn("there is no handhistory info[pot]. {}", in._4) case "GameEndHistory" => val players = playerState.value() val handHistoryInfo = handState.value() ... if (LOG.isTraceEnabled()) LOG.trace("end {}", record.getHandHistoryId) playerState.clear() handState.clear() case _ => } } —— log —— 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] INFO com.nsuslab.denma.stream.winloss.flow.Main$ - hand start 5769392597641628595 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] WARN com.nsuslab.denma.stream.winloss.flow.Main$ - there is no handhistory info[pot]. > On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <u...@apache.org> wrote: > > What do you mean with lost exactly? > > You call value() and it returns a value (!= null/defaultValue) and you > call it again and it returns null/defaultValue for the same key with > no update in between? > > On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas > <k.klou...@data-artisans.com> wrote: >> Hello, >> >> Could you share the code of the job you are running? >> With only this information I am afraid we cannot help much. >> >> Thanks, >> Kostas >> >>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <kim.s...@gmail.com> wrote: >>> >>> Hi. >>> I’m using flink 1.0.3 on aws EMR. >>> sporadically value of ValueState is lost. >>> what is starting point for solving this problem. >>> Thank you. >>